Re: KafkaProducer "send" blocks on first attempt with Kafka server offline
Oleg, I believe 0.9 producer gave you the control "max.block.ms" now On Wed, Mar 30, 2016 at 5:31 AM, Oleg Zhurakousky < ozhurakou...@hortonworks.com> wrote: > I'll buy both 'back pressure' and 'block' argument, but what does it have > to do with the Future? Isn't that the main point of the Future - a > reference to an invocation that may or may not occur some time in the > future? Isn't that the purpose of the Future.get(..) to give user a choice > and chance to wait for completion, and that is where blocking and back > pressure is handled. > > The scary part is that not only send() can block indefinitely, if used > within users code it creates a deadlock in users code unless user does > something to avoid that. And what some of us do is invoke send() as > Callable returning yet another Future. Doesn't that seem awkward? Please > let me know if there is another way to deal with infinite block. > > Furthermore, the problem is similar to the one in consumer API where > Iterator was mapped to network service and hasNext() is also a blocking > call, but at least unlike send() there is a workaround property for > hasNext(). Could at least the same workaround be introduced to send()? > > Cheers > Oleg > > > On Mar 29, 2016, at 23:12, Dana Powers <dana.pow...@gmail.com> wrote: > > > > Somewhat of an aside, but you might note that the kafka producer is > > intended to block during send() as backpressure on memory allocation. > > This is admittedly different than blocking on metadata, but it is worth > > considering that the premise that send() should *never* block because > > it returns a Future seems fundamentally at odds with the current design. > > > > In any event, there is a configuration that you can tweak to set the max > > time the producer will spend blocking in send(): max.block.ms > > > > -Dana > > > > > >> On Tue, Mar 29, 2016 at 7:26 PM, Steven Wu <stevenz...@gmail.com> > wrote: > >> > >> I also agree that returning a Future should never block. I have brought > >> this up when 0.8.2 was first released for new Java producer. > >> > >> As Oleg said, KafkaProducer can also block if metadata is not fetched. > This > >> is probably more often than offline broker, because metadata is loaded > >> lazily when there is a first send attempt for the topic. In another > word, > >> first send to a topic will always block until metadata is fetched or > timed > >> out. > >> > >> We had to handle this in our wrapper code. Basically, we check if > metadata > >> not available, we put msg into a queue and drain the queue from a diff > >> thread. > >> > >> Thanks, > >> Steven > >> > >> > >> On Tue, Mar 29, 2016 at 4:59 AM, Oleg Zhurakousky < > >> ozhurakou...@hortonworks.com> wrote: > >> > >>> I agree and considering that send(..) method returns Future one would > >>> argue it must never block, otherwise what’s the point of returning > Future > >>> if you remove user’s ability to control how long are they willing to > wait > >>> and what to do when certain types of exception arise. Nevertheless it > >> does > >>> and it happens in the very first line of code: > >>> // first make sure the metadata for the topic is available > >>> waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); > >>> So I am curious as well as to what is the motivation for it as we’ve > seen > >>> the same symptoms multiple times. > >>> Cheers > >>> Oleg > >>> On Mar 29, 2016, at 4:31 AM, Paolo Patierno <ppatie...@live.com > >>> ppatie...@live.com>> wrote: > >>> > >>> Hello, > >>> as documentation says, the KafkaProducer.send() method is asynchronous > >> and > >>> it just returns immediately.I found out that it's not so true when the > >>> Kafka server it's trying to connect isn't online.Of course it happens > >> only > >>> on the first send() method invocation. It means that if the Kafka > server > >>> isn't reachable when my application starts for the first time, the > send() > >>> method isn't so asynchronous but it blocks.I know that it's trying to > >>> connect to the Kafka server but why it doesn't save the message into > the > >>> buffer and returns immediately ?Is it a behavior or a bug ? > >>> Thanks,Paolo > >>> > >>> Paolo PatiernoSenior Software Engineer > >>> > >>> > >>> Windows Embedded & IoTMicrosoft Azure Advisor > >>> Twitter : @ppatierno > >>> Linkedin : paolopatierno > >>> Blog : DevExperienceBlog : Embedded101 > >> >
Re: KafkaProducer "send" blocks on first attempt with Kafka server offline
I also agree that returning a Future should never block. I have brought this up when 0.8.2 was first released for new Java producer. As Oleg said, KafkaProducer can also block if metadata is not fetched. This is probably more often than offline broker, because metadata is loaded lazily when there is a first send attempt for the topic. In another word, first send to a topic will always block until metadata is fetched or timed out. We had to handle this in our wrapper code. Basically, we check if metadata not available, we put msg into a queue and drain the queue from a diff thread. Thanks, Steven On Tue, Mar 29, 2016 at 4:59 AM, Oleg Zhurakousky < ozhurakou...@hortonworks.com> wrote: > I agree and considering that send(..) method returns Future one would > argue it must never block, otherwise what’s the point of returning Future > if you remove user’s ability to control how long are they willing to wait > and what to do when certain types of exception arise. Nevertheless it does > and it happens in the very first line of code: > // first make sure the metadata for the topic is available > waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); > So I am curious as well as to what is the motivation for it as we’ve seen > the same symptoms multiple times. > Cheers > Oleg > On Mar 29, 2016, at 4:31 AM, Paolo Patierno> wrote: > > Hello, > as documentation says, the KafkaProducer.send() method is asynchronous and > it just returns immediately.I found out that it's not so true when the > Kafka server it's trying to connect isn't online.Of course it happens only > on the first send() method invocation. It means that if the Kafka server > isn't reachable when my application starts for the first time, the send() > method isn't so asynchronous but it blocks.I know that it's trying to > connect to the Kafka server but why it doesn't save the message into the > buffer and returns immediately ?Is it a behavior or a bug ? > Thanks,Paolo > > Paolo PatiernoSenior Software Engineer > > > Windows Embedded & IoTMicrosoft Azure Advisor > Twitter : @ppatierno > Linkedin : paolopatierno > Blog : DevExperienceBlog : Embedded101 > > >
Re: some producers stuck when one broker is bad
I was doing a rolling bounce of all brokers. Immediately after the bad broker was bounced, those stuck producers recovered On Fri, Sep 11, 2015 at 9:05 AM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > So how did you detect that the broker is bad? If bouncing brokers solved > the problem and you did not find any unusual things in the logs on brokers > , it is likely that the process was up but was isolated from producer > request and since the producer did not have timeout the producer buffer > filled up. > > Thanks, > > Mayuresh > > > On Thu, Sep 10, 2015 at 11:20 PM, Steven Wu <stevenz...@gmail.com> wrote: > > > frankly I don't know exactly what went BAD for that broker. process is > > still UP. > > > > On Wed, Sep 9, 2015 at 10:10 AM, Mayuresh Gharat < > > gharatmayures...@gmail.com > > > wrote: > > > > > 1) any suggestion on how to identify the bad broker(s)? > > > ---> At Linkedin we have alerts that are setup using our internal > scripts > > > for detecting if a broker has gone bad. We also check the under > > replicated > > > partitions and that can tell us which broker has gone bad. By broker > > going > > > bad, it can mean different things. Like the broker is alive but not > > > responding and is completely isolated or the broker has gone down, etc. > > > Can you tell us what you meant by your BROKER went BAD? > > > > > > 2) why bouncing of the bad broker got the producers recovered > > automatically > > > > This is because as you bounced, the leaders for other partitions > > > changed and producer sent out a TopicMetadataRequest which tells the > > > producer who are the new leaders for the partitions and the producer > > > started sending messages to those brokers. > > > > > > KAFKA-2120 will handle all of this for you automatically. > > > > > > Thanks, > > > > > > Mayuresh > > > > > > On Tue, Sep 8, 2015 at 8:26 PM, Steven Wu <stevenz...@gmail.com> > wrote: > > > > > > > We have observed that some producer instances stopped sending traffic > > to > > > > brokers, because the memory buffer is full. those producers got stuck > > in > > > > this state permanently. Because we couldn't find out which broker is > > bad > > > > here. So I did a rolling restart the all brokers. after the bad > broker > > > got > > > > bounce, those stuck producers out of the woods automatically. > > > > > > > > I don't know the exact problem with that bad broker. it seems to me > > that > > > > some ZK states are inconsistent. > > > > > > > > I know timeout fix from KAFKA-2120 can probably avoid the permanent > > > stuck. > > > > Here are some additional questions. > > > > 1) any suggestion on how to identify the bad broker(s)? > > > > 2) why bouncing of the bad broker got the producers recovered > > > automatically > > > > (without restarting producers) > > > > > > > > producer: 0.8.2.1 > > > > broker: 0.8.2.1 > > > > > > > > Thanks, > > > > Steven > > > > > > > > > > > > > > > > -- > > > -Regards, > > > Mayuresh R. Gharat > > > (862) 250-7125 > > > > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >
Re: some producers stuck when one broker is bad
frankly I don't know exactly what went BAD for that broker. process is still UP. On Wed, Sep 9, 2015 at 10:10 AM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > 1) any suggestion on how to identify the bad broker(s)? > ---> At Linkedin we have alerts that are setup using our internal scripts > for detecting if a broker has gone bad. We also check the under replicated > partitions and that can tell us which broker has gone bad. By broker going > bad, it can mean different things. Like the broker is alive but not > responding and is completely isolated or the broker has gone down, etc. > Can you tell us what you meant by your BROKER went BAD? > > 2) why bouncing of the bad broker got the producers recovered automatically > > This is because as you bounced, the leaders for other partitions > changed and producer sent out a TopicMetadataRequest which tells the > producer who are the new leaders for the partitions and the producer > started sending messages to those brokers. > > KAFKA-2120 will handle all of this for you automatically. > > Thanks, > > Mayuresh > > On Tue, Sep 8, 2015 at 8:26 PM, Steven Wu <stevenz...@gmail.com> wrote: > > > We have observed that some producer instances stopped sending traffic to > > brokers, because the memory buffer is full. those producers got stuck in > > this state permanently. Because we couldn't find out which broker is bad > > here. So I did a rolling restart the all brokers. after the bad broker > got > > bounce, those stuck producers out of the woods automatically. > > > > I don't know the exact problem with that bad broker. it seems to me that > > some ZK states are inconsistent. > > > > I know timeout fix from KAFKA-2120 can probably avoid the permanent > stuck. > > Here are some additional questions. > > 1) any suggestion on how to identify the bad broker(s)? > > 2) why bouncing of the bad broker got the producers recovered > automatically > > (without restarting producers) > > > > producer: 0.8.2.1 > > broker: 0.8.2.1 > > > > Thanks, > > Steven > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >
some producers stuck when one broker is bad
We have observed that some producer instances stopped sending traffic to brokers, because the memory buffer is full. those producers got stuck in this state permanently. Because we couldn't find out which broker is bad here. So I did a rolling restart the all brokers. after the bad broker got bounce, those stuck producers out of the woods automatically. I don't know the exact problem with that bad broker. it seems to me that some ZK states are inconsistent. I know timeout fix from KAFKA-2120 can probably avoid the permanent stuck. Here are some additional questions. 1) any suggestion on how to identify the bad broker(s)? 2) why bouncing of the bad broker got the producers recovered automatically (without restarting producers) producer: 0.8.2.1 broker: 0.8.2.1 Thanks, Steven
producer metadata behavior when topic not created
Hi, I am talking about the 0.8.2 java producer. In our deployment, we disables auto topic creation, because we would like to control the precise number of partitions created for each topic and the placement of partitions (e.g. zone-aware). I did some experimentation and checked the code. metadata request to broker (for non-exist topic) will got a successful response. should broker return failure or partial failure if queried topic doesn't exist? can we add metric at broker side for querying non-exist topics? The net behavior is that there are more metadata queries from producer, throttled by the backoff config (default is 100ms). can we add a metric for metadata request and response rate? rate should normally be very low during steady state, as default refresh interval is 5 mins. basically, I am trying to detect this scenario (non-exist topic) and be able to alert on some metrics. any other suggestions? Thanks, Steven
Re: HDD or SSD or EBS for kafka brokers in Amazon EC2
1369725251. It's a fix in the underline virtualization platform. Here is the quote from the ticket. The issue is a bug in a performance improvement (10% improved PPS when using Xen PV netback/netfront networking) in the latest build of the virtualization platform, which has only been released to D2 instances. The issue is triggered by a race condition deadlock in kernel code that your workload appears to trigger 5-10% of the time. On Tue, Jun 2, 2015 at 4:26 PM, Henry Cai h...@pinterest.com.invalid wrote: Steven, Do you have the AWS case # (or the Ubuntu bug/case #) when you hit that kernel panic issue? Our company will still be running on AMI image 12.04 for a while, I will see whether the fix was also ported onto Ubuntu 12.04 On Tue, Jun 2, 2015 at 2:53 PM, Steven Wu stevenz...@gmail.com wrote: now I remember we had same kernel panic issue in the first week of D2 rolling-out. then AWS fixed it and we haven't seen any issue since. try Ubuntu 14.04 and see if it resolves your remaining kernel/instability issue. On Tue, Jun 2, 2015 at 2:30 PM, Wes Chow w...@chartbeat.com wrote: Daniel Nelson daniel.nel...@vungle.com June 2, 2015 at 4:39 PM On Jun 2, 2015, at 1:22 PM, Steven Wu stevenz...@gmail.com stevenz...@gmail.com wrote: can you elaborate what kind of instability you have encountered? We have seen the nodes become completely non-responsive. Usually they get rebooted automatically after 10-20 minutes, but occasionally they get stuck for days in a state where they cannot be rebooted via the Amazon APIs. Same here. It was worse right after d2 launch. We had 6 out of 9 servers die within 10 hours after spinning them up. Amazon rolled out a fix, but we're still seeing similar issues, though not nearly as bad. The first fix was for something network related, and apparently sending lots of data through the instances caused a kernel panic on the host. We have no information yet about the current issue. Wes Steven Wu stevenz...@gmail.com June 2, 2015 at 4:22 PM Wes/Daniel, can you elaborate what kind of instability you have encountered? we are on Ubuntu 14.04.2 and haven't encountered any issues so far. in the announcement, they did mention using Ubuntu 14.04 for better disk throughput. not sure whether 14.04 also addresses any instability issue you encountered or not. Thanks, Steven In order to ensure the best disk throughput performance from your D2 instances on Linux, we recommend that you use the most recent version of the Amazon Linux AMI, or another Linux AMI with a kernel version of 3.8 or later. The D2 instances provide the best disk performance when you use a Linux kernel that supports Persistent Grants – an extension to the Xen block ring protocol that significantly improves disk throughput and scalability. The following Linux AMIs support this feature: - Amazon Linux AMI 2015.03 (HVM) - Ubuntu Server 14.04 LTS (HVM) - Red Hat Enterprise Linux 7.1 (HVM) - SUSE Linux Enterprise Server 12 (HVM) Daniel Nelson daniel.nel...@vungle.com June 2, 2015 at 2:42 PM Do you have any workarounds for the d2 issues? We’ve been using them for our Kafkas too, and ran into the instability. We’re on Ubuntu 12.04 and plan to try on 14.04 with the latest HWE to see if that helps any. Thanks! Wes Chow w...@chartbeat.com June 2, 2015 at 1:39 PM We have run d2 instances with Kafka. They're currently unstable -- Amazon confirmed a host issue with d2 instances that gets tickled by a Kafka workload yesterday. Otherwise, it seems the d2 instance type is ideal as it gets an enormous amount of disk throughput and you'll likely be network bottlenecked. Wes Steven Wu stevenz...@gmail.com June 2, 2015 at 1:07 PM EBS (network attached storage) has got a lot better over the last a few years. we don't quite trust it for kafka workload. At Netflix, we were going with the new d2 instance type (HDD). our perf/load testing shows it satisfy our workload. SSD is better in latency curve but pretty comparable in terms of throughput. we can use the extra space from HDD for longer retention period. On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid h...@pinterest.com.invalid
Re: HDD or SSD or EBS for kafka brokers in Amazon EC2
Wes/Daniel, can you elaborate what kind of instability you have encountered? we are on Ubuntu 14.04.2 and haven't encountered any issues so far. in the announcement, they did mention using Ubuntu 14.04 for better disk throughput. not sure whether 14.04 also addresses any instability issue you encountered or not. Thanks, Steven In order to ensure the best disk throughput performance from your D2 instances on Linux, we recommend that you use the most recent version of the Amazon Linux AMI, or another Linux AMI with a kernel version of 3.8 or later. The D2 instances provide the best disk performance when you use a Linux kernel that supports Persistent Grants – an extension to the Xen block ring protocol that significantly improves disk throughput and scalability. The following Linux AMIs support this feature: - Amazon Linux AMI 2015.03 (HVM) - Ubuntu Server 14.04 LTS (HVM) - Red Hat Enterprise Linux 7.1 (HVM) - SUSE Linux Enterprise Server 12 (HVM) On Tue, Jun 2, 2015 at 12:31 PM, Wes Chow w...@chartbeat.com wrote: Our workaround is to switch to i2's. Amazon didn't mention anything, though we're getting on a call with them soon so I'll be sure to ask. Fwiw, we're also on 12.04. Wes Daniel Nelson daniel.nel...@vungle.com June 2, 2015 at 2:42 PM Do you have any workarounds for the d2 issues? We’ve been using them for our Kafkas too, and ran into the instability. We’re on Ubuntu 12.04 and plan to try on 14.04 with the latest HWE to see if that helps any. Thanks! Wes Chow w...@chartbeat.com June 2, 2015 at 1:39 PM We have run d2 instances with Kafka. They're currently unstable -- Amazon confirmed a host issue with d2 instances that gets tickled by a Kafka workload yesterday. Otherwise, it seems the d2 instance type is ideal as it gets an enormous amount of disk throughput and you'll likely be network bottlenecked. Wes Henry Cai h...@pinterest.com.INVALID June 2, 2015 at 12:37 PM We have been hosting kafka brokers in Amazon EC2 and we are using EBS disk. But periodically we were hit by long I/O wait time on EBS in some Availability Zones. We are thinking to change the instance types to a local HDD or local SSD. HDD is cheaper and bigger and seems quite fit for the Kafka use case which is mostly sequential read/write, but some early experiments show the HDD cannot catch up with the message producing speed since there are many topic/partitions on the broker which actually makes the disk I/O more randomly accessed. How are people's experience of choosing disk types on Amazon?
Re: HDD or SSD or EBS for kafka brokers in Amazon EC2
now I remember we had same kernel panic issue in the first week of D2 rolling-out. then AWS fixed it and we haven't seen any issue since. try Ubuntu 14.04 and see if it resolves your remaining kernel/instability issue. On Tue, Jun 2, 2015 at 2:30 PM, Wes Chow w...@chartbeat.com wrote: Daniel Nelson daniel.nel...@vungle.com June 2, 2015 at 4:39 PM On Jun 2, 2015, at 1:22 PM, Steven Wu stevenz...@gmail.com stevenz...@gmail.com wrote: can you elaborate what kind of instability you have encountered? We have seen the nodes become completely non-responsive. Usually they get rebooted automatically after 10-20 minutes, but occasionally they get stuck for days in a state where they cannot be rebooted via the Amazon APIs. Same here. It was worse right after d2 launch. We had 6 out of 9 servers die within 10 hours after spinning them up. Amazon rolled out a fix, but we're still seeing similar issues, though not nearly as bad. The first fix was for something network related, and apparently sending lots of data through the instances caused a kernel panic on the host. We have no information yet about the current issue. Wes Steven Wu stevenz...@gmail.com June 2, 2015 at 4:22 PM Wes/Daniel, can you elaborate what kind of instability you have encountered? we are on Ubuntu 14.04.2 and haven't encountered any issues so far. in the announcement, they did mention using Ubuntu 14.04 for better disk throughput. not sure whether 14.04 also addresses any instability issue you encountered or not. Thanks, Steven In order to ensure the best disk throughput performance from your D2 instances on Linux, we recommend that you use the most recent version of the Amazon Linux AMI, or another Linux AMI with a kernel version of 3.8 or later. The D2 instances provide the best disk performance when you use a Linux kernel that supports Persistent Grants – an extension to the Xen block ring protocol that significantly improves disk throughput and scalability. The following Linux AMIs support this feature: - Amazon Linux AMI 2015.03 (HVM) - Ubuntu Server 14.04 LTS (HVM) - Red Hat Enterprise Linux 7.1 (HVM) - SUSE Linux Enterprise Server 12 (HVM) Daniel Nelson daniel.nel...@vungle.com June 2, 2015 at 2:42 PM Do you have any workarounds for the d2 issues? We’ve been using them for our Kafkas too, and ran into the instability. We’re on Ubuntu 12.04 and plan to try on 14.04 with the latest HWE to see if that helps any. Thanks! Wes Chow w...@chartbeat.com June 2, 2015 at 1:39 PM We have run d2 instances with Kafka. They're currently unstable -- Amazon confirmed a host issue with d2 instances that gets tickled by a Kafka workload yesterday. Otherwise, it seems the d2 instance type is ideal as it gets an enormous amount of disk throughput and you'll likely be network bottlenecked. Wes Steven Wu stevenz...@gmail.com June 2, 2015 at 1:07 PM EBS (network attached storage) has got a lot better over the last a few years. we don't quite trust it for kafka workload. At Netflix, we were going with the new d2 instance type (HDD). our perf/load testing shows it satisfy our workload. SSD is better in latency curve but pretty comparable in terms of throughput. we can use the extra space from HDD for longer retention period. On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid h...@pinterest.com.invalid
Re: HDD or SSD or EBS for kafka brokers in Amazon EC2
EBS (network attached storage) has got a lot better over the last a few years. we don't quite trust it for kafka workload. At Netflix, we were going with the new d2 instance type (HDD). our perf/load testing shows it satisfy our workload. SSD is better in latency curve but pretty comparable in terms of throughput. we can use the extra space from HDD for longer retention period. On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid wrote: We have been hosting kafka brokers in Amazon EC2 and we are using EBS disk. But periodically we were hit by long I/O wait time on EBS in some Availability Zones. We are thinking to change the instance types to a local HDD or local SSD. HDD is cheaper and bigger and seems quite fit for the Kafka use case which is mostly sequential read/write, but some early experiments show the HDD cannot catch up with the message producing speed since there are many topic/partitions on the broker which actually makes the disk I/O more randomly accessed. How are people's experience of choosing disk types on Amazon?
Re: Async Producer Callback
in your callback impl object, you can save a reference to the actual message. On Wed, Mar 18, 2015 at 10:45 PM, sunil kalva kalva.ka...@gmail.com wrote: Hi How do i access the actual message which is failed to send to cluster using Callback interface and onCompletion method. Basically if the sender is failed to send, i want to add it to a temp queue and retry them later. --Sunil
Re: non-blocking sends when cluster is down
metadata fetch only happens/blocks for the first time you call send. after the metadata is retrieved can cached in memory. it will not block again. so yes, there is a possibility it can block. of course, if cluster is down and metadata was never fetched, then every send can block. metadata is also refreshed periodically after the first fetch. metadata.max.age.ms=30 On Thu, Feb 26, 2015 at 4:47 AM, Gary Ogden gog...@gmail.com wrote: I was actually referring to the metadata fetch. Sorry I should have been more descriptive. I know we can decrease the metadata.fetch.timeout.ms setting to be a lot lower, but it's still blocking if it can't get the metadata. And I believe that the metadata fetch happens every time we call send()? On 25 February 2015 at 19:03, Guozhang Wang wangg...@gmail.com wrote: Hi Gray, The Java producer will block on send() when the buffer is full and block.on.buffer.full = true ( http://kafka.apache.org/documentation.html#newproducerconfigs). If you set the config to false the send() call will throw a BufferExhaustedException which, in your case, can be caught and ignore and allow the message to drop on the floor. Guozhang On Wed, Feb 25, 2015 at 5:08 AM, Gary Ogden gog...@gmail.com wrote: Say the entire kafka cluster is down and there's no brokers to connect to. Is it possible to use the java producer send method and not block until there's a timeout? Is it as simple as registering a callback method? We need the ability for our application to not have any kind of delay when sending messages and the cluster is down. It's ok if the messages are lost when the cluster is down. Thanks! -- -- Guozhang
Re: New Producer - Is the configurable partitioner gone?
The low connection partitioner might work for this by attempting to reuse recently used nodes whenever possible. That is useful in environments with lots and lots of producers where you don't care about semantic partitioning. In one of the perf test, we found that above sticky partitioner improved batching and reduced cpu util at broker side by 60%. We plan to make it our default partitioner. On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Daniel, Yeah I think that would be doable. If you want to pursue it you would need to do a quick KIP just to get everyone on the same page since this would be a public interface we would have to support over a long time: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals When we have the details worked out, then it should be a fairly straight-forward patch to make that pluggable. A few comments: - I think we should just make the DefaultPartitioner the default value for that configuration, rather than having it be a fall back. - You need to pass in the binary key and value in addition to the java objects. Otherwise any partitioning based on the binary value will require reserializing these. - If we add this option we should really ship at least one other useful partitioning strategy. The low connection partitioner might work for this by attempting to reuse recently used nodes whenever possible. That is useful in environments with lots and lots of producers where you don't care about semantic partitioning. It would be good to think through if there are any other useful partitioning strategies to make sure they would also be doable with the interface we would end up with. - Currently Cluster is not a public class so we'll have to think about whether we want to make that public. -Jay On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener daniel.wege...@holisticon.de wrote: Jay Kreps jay.kreps@... writes: Hey Daniel, partitionsFor() will block the very first time it sees a new topic that it doesn't have metadata for yet. If you want to ensure you don't block even that one time, call it prior to your regular usage so it initializes then. The rationale for adding a partition in ProducerRecord was that there are actually cases where you want to associate a key with the record but not use it for partitioning. The rationale for not retaining the pluggable partitioner was that you could always just use the partition (many people dislike the plugin apis and asked for it). Personally, like you, I preferred the plugin apis. We aren't cut off from exposing the existing partitioner usage as a public api that you can override if there is anyone who wants that. I think one nice thing about it would be the ability to ship with an alternative partitioning strategy that you could enable purely in config. For example the old producer had a partitioning strategy that attempted to minimize the number of TCP connections for cases where there was no key. 98% of people loathed that, but for 2% of people it was useful and this would be a way to still include that behavior for people migrating to the new producer. -Jay On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener daniel.wegener@... wrote: Gwen Shapira gshapira at ... writes: Hi Daniel, I think you can still use the same logic you had in the custom partitioner in the old producer. You just move it to the client that creates the records. The reason you don't cache the result of partitionsFor is that the producer should handle the caching for you, so its not necessarily a long or blocking call. I see it as a pretty small change to the API. But I'm not sure what drove the change either. Gwen On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener Daniel.Wegener at ... wrote: Hello Kafka-users! I am facing a migration from a kind of ( a bit self plumbed) kafka 0.8.1 producer to the new kafka-clients API. I just recognized, that the new KafkaProducer initializes its own Partitioner that cannot be changed (final field, no ctor-param, no Class.forName(config.getPartitionerClassNameFoo()).newInstance()). Is this an intentional change? If i understand the new API correctly, one should either define a key for a message and let the default Partitioner care that it will be distributed over all available partitions or to set an explicit partition number per message that will be written to. The old API api allowed to create ProducerRecors with a key and/or a key used only for partitioning (but one that is not sent down the wire) and then to provide a custom Partitioner that later could distribute this partitioning key
Re: New Producer - Is the configurable partitioner gone?
yes. this is with the new java client. since it is using non-blocking NIO, sender thread probably was able to scan the buffer very frequently. hence random partitioner won't get much chance to accumulate records for batch or request. Setup * - 3 broker instances (m1.xlarge)- 6 producer instances (m1.xlarge)- topic partitions: 36- message size: 1 KB- no compression- traffic volume- total: 30 MB / 30K msgs,- per broker: 10 MB / 10K msgs*Summary partitioner batched records per request broker cpu util random without lingering 1.25 75% sticky without lingering 2.0 50% sticky with 100ms lingering 15 33% there are two ways to improve batching 1. use sticky partitioner that we implement. kafka default is random partitioner, where a random partition is selected for each msg. with sticky partitioner, we can stick all msgs (to one topic) on the same partition for a while (e.g. 1 second) before moving on to next partition. 2. set linger.ms property from kafka producer. it allows message to linger around for some period and hope for batching opportunity. We can deploy one or both methods. But the main point is that improved batching helps broker a lot. “linger.ms” can cause risk of filling up the buffer. it works very well with sticky partitioner because it is very fast to accumulate a full batch. On Sun, Feb 22, 2015 at 5:21 PM, Jay Kreps jay.kr...@gmail.com wrote: Interesting, and this was with the new Java client? This sounds like as much an opportunity for improvement in the code as anything. Would you be willing to share the details? -jay On Sunday, February 22, 2015, Steven Wu stevenz...@gmail.com wrote: The low connection partitioner might work for this by attempting to reuse recently used nodes whenever possible. That is useful in environments with lots and lots of producers where you don't care about semantic partitioning. In one of the perf test, we found that above sticky partitioner improved batching and reduced cpu util at broker side by 60%. We plan to make it our default partitioner. On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps jay.kr...@gmail.com javascript:; wrote: Hey Daniel, Yeah I think that would be doable. If you want to pursue it you would need to do a quick KIP just to get everyone on the same page since this would be a public interface we would have to support over a long time: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals When we have the details worked out, then it should be a fairly straight-forward patch to make that pluggable. A few comments: - I think we should just make the DefaultPartitioner the default value for that configuration, rather than having it be a fall back. - You need to pass in the binary key and value in addition to the java objects. Otherwise any partitioning based on the binary value will require reserializing these. - If we add this option we should really ship at least one other useful partitioning strategy. The low connection partitioner might work for this by attempting to reuse recently used nodes whenever possible. That is useful in environments with lots and lots of producers where you don't care about semantic partitioning. It would be good to think through if there are any other useful partitioning strategies to make sure they would also be doable with the interface we would end up with. - Currently Cluster is not a public class so we'll have to think about whether we want to make that public. -Jay On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener daniel.wege...@holisticon.de javascript:; wrote: Jay Kreps jay.kreps@... writes: Hey Daniel, partitionsFor() will block the very first time it sees a new topic that it doesn't have metadata for yet. If you want to ensure you don't block even that one time, call it prior to your regular usage so it initializes then. The rationale for adding a partition in ProducerRecord was that there are actually cases where you want to associate a key with the record but not use it for partitioning. The rationale for not retaining the pluggable partitioner was that you could always just use the partition (many people dislike the plugin apis and asked for it). Personally, like you, I preferred the plugin apis. We aren't cut off from exposing the existing partitioner usage as a public api that you can override if there is anyone who wants that. I think one nice thing about it would be the ability to ship with an alternative partitioning strategy that you could enable purely in config. For example the old producer had a partitioning strategy that attempted to minimize the number of TCP connections for cases where there was no key. 98% of people
Re: big cpu jump on producer in face of broker outage
will try 0.8.2.1 on producer and report back result. On Thu, Feb 19, 2015 at 11:52 AM, Jun Rao j...@confluent.io wrote: This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you try that version or 0.8.2.1 which is being voted now. Thanks, Jun On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu stevenz...@gmail.com wrote: forgot to mention in case it matters producer: 0.8.2-beta broker: 0.8.1.1 On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu stevenz...@gmail.com wrote: I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% - 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
big cpu jump on producer in face of broker outage
I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% - 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
Re: big cpu jump on producer in face of broker outage
forgot to mention in case it matters producer: 0.8.2-beta broker: 0.8.1.1 On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu stevenz...@gmail.com wrote: I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% - 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
Re: could new java producer miss callbacks after successful send?
couldn't reproduce/confirm the issue with my test. send 6 million msgs from 6 instances. got 6 million callbacks. this could be some metric issues. On Mon, Feb 9, 2015 at 8:23 PM, Steven Wu stevenz...@gmail.com wrote: I don't have strong evidence that this is a bug yet. let me write some test program and see if I can confirm/reproduce the issue. On Mon, Feb 9, 2015 at 7:59 PM, Jay Kreps jay.kr...@gmail.com wrote: Hmm, that does sound like a bug, we haven't seen that. How easy is it to reproduce this? -Jay On Mon, Feb 9, 2015 at 5:19 PM, Steven Wu stevenz...@gmail.com wrote: We observed some small discrepancy in messages sent per second reported at different points. 1) and 4) matches very close. 2) and 3) matches very close but are about *5-6% lower* compared to 1) and 4). 1) send attempt from producer 2) send success from producer 3) record-send-rate reported by kafka producer 4) MessagesInPerSecond reported by kafka broker note that send success for 2) is incremented when onCompletion is called without error/exception). there is also a send error count when onCompletion is called with error. it is always zero. that's why I am wondering whether there are some callback misses? some info about the setup * producer: 0.8.2-beta * broker: 0.8.1.1 * acks=1 Thanks, Steven
could new java producer miss callbacks after successful send?
We observed some small discrepancy in messages sent per second reported at different points. 1) and 4) matches very close. 2) and 3) matches very close but are about *5-6% lower* compared to 1) and 4). 1) send attempt from producer 2) send success from producer 3) record-send-rate reported by kafka producer 4) MessagesInPerSecond reported by kafka broker note that send success for 2) is incremented when onCompletion is called without error/exception). there is also a send error count when onCompletion is called with error. it is always zero. that's why I am wondering whether there are some callback misses? some info about the setup * producer: 0.8.2-beta * broker: 0.8.1.1 * acks=1 Thanks, Steven
Re: could new java producer miss callbacks after successful send?
I don't have strong evidence that this is a bug yet. let me write some test program and see if I can confirm/reproduce the issue. On Mon, Feb 9, 2015 at 7:59 PM, Jay Kreps jay.kr...@gmail.com wrote: Hmm, that does sound like a bug, we haven't seen that. How easy is it to reproduce this? -Jay On Mon, Feb 9, 2015 at 5:19 PM, Steven Wu stevenz...@gmail.com wrote: We observed some small discrepancy in messages sent per second reported at different points. 1) and 4) matches very close. 2) and 3) matches very close but are about *5-6% lower* compared to 1) and 4). 1) send attempt from producer 2) send success from producer 3) record-send-rate reported by kafka producer 4) MessagesInPerSecond reported by kafka broker note that send success for 2) is incremented when onCompletion is called without error/exception). there is also a send error count when onCompletion is called with error. it is always zero. that's why I am wondering whether there are some callback misses? some info about the setup * producer: 0.8.2-beta * broker: 0.8.1.1 * acks=1 Thanks, Steven
Re: error handling with high-level consumer
Jun, we are already passing the retention period. so can't go back and do a DumpLogSegment. plus there are other factors make this exercise difficult: 1) this topic has very high traffic volume 2) we don't know the msg offset that is corrupted anyhow, it doesn't happen often. but can you advise proper action/handling in this case? any other exceptions from iterator.next() that we should handle? Thanks, Steven On Wed, Feb 4, 2015 at 9:33 PM, Jun Rao j...@confluent.io wrote: 1) Does the corruption happen to console consumer as well? If so, could you run DumpLogSegment tool to see if the data is corrupted on disk? Thanks, Jun On Wed, Feb 4, 2015 at 9:55 AM, Steven Wu stevenz...@gmail.com wrote: Hi, We have observed these two exceptions with consumer *iterator.next()* recently. want to ask how should we handle them properly. *1) CRC corruption* Message is corrupt (stored crc = 433657556, computed crc = 3265543163) I assume in this case we should just catch it and move on to the next msg? any other iterator/consumer exception we should catch and handle? *2) Unrecoverable consumer erorr with Iterator is in failed state* yesterday, one of our kafka consumers got stuck with very large maxalg and was throwing the following exception. 2015-02-04 08:35:19,841 ERROR KafkaConsumer-0 KafkaConsumer - Exception on consuming kafka with topic: topic_name java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46) at com.netflix.suro.input.kafka.KafkaConsumer$1.run(KafkaConsumer.java:103) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) we had a surge of traffic of topic_name, so I guess the traffic storm caused the problem. I tried to restart a few consumer instances but after rebalancing, another instance got assigned the problematic partitions and got stuck again with the above errors. We decided to drop messages, stop all consumer instances, reset all offset by deleting zk entries and restarted them, the problem went away. Producer version is kafka_2.8.2-0.8.1.1 with snappy-java-1.0.5 Consumer version is kafka_2.9.2-0.8.2-beta with snappy-java-1.1.1.6 We googled this issue but this was already fixed long time ago on 0.7.x. any idea? is mismatched snappy version the culpit? is it a bug in 0.8.2-beta? Thanks, Steven
high cpu and network traffic when cluster has no topic
Hi, We have observed high cpu and high network traffic problem when 1) cluster (0.8.1.1) has no topic 2) KafkaProducer (0.8.2-beta) object is created without sending any traffic We have observed such problem twice. In both cases, problem went away immediately after one/any topic is created. Is this a known issue? Just want to check with the community first before I spend much time to reproduce it. I couldn't reproduce the issue with similar setup with unit test code in IDE. start two brokers with no topic locally on my laptop. create a KafkaProducer object without sending any msgs. but I only tested with 0.8.2-beta for both broker and producer. Thanks, Steven
Re: high cpu and network traffic when cluster has no topic
sure. will try my unit test again with 0.8.2.0 release tomorrow and report back my findings. On Tue, Feb 3, 2015 at 8:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, That sounds like a bug. I think we fixed a few producer high cpu issues since the beta, I wonder if you could repeat the same test with the 0.8.2. final release? -Jay On Tue, Feb 3, 2015 at 8:37 PM, Steven Wu stevenz...@gmail.com wrote: actually, my local test can reproduce the issue although not immediately. seems to happen after a few mins. I enabled TRACE level logging. here seems to be the tight loop. you can see that there are two metadata requests in one milli-seconds. kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 - Ignoring empty metadata response with correlation id 360185. kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 - Trying to send metadata request to node -2 kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:374 - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=360186,client_id=foo}, body={topics=[]})) to node -2 kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 - Ignoring empty metadata response with correlation id 360186. kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 - Trying to send metadata request to node -2 On Tue, Feb 3, 2015 at 8:10 PM, Steven Wu stevenz...@gmail.com wrote: Hi, We have observed high cpu and high network traffic problem when 1) cluster (0.8.1.1) has no topic 2) KafkaProducer (0.8.2-beta) object is created without sending any traffic We have observed such problem twice. In both cases, problem went away immediately after one/any topic is created. Is this a known issue? Just want to check with the community first before I spend much time to reproduce it. I couldn't reproduce the issue with similar setup with unit test code in IDE. start two brokers with no topic locally on my laptop. create a KafkaProducer object without sending any msgs. but I only tested with 0.8.2-beta for both broker and producer. Thanks, Steven
Re: high cpu and network traffic when cluster has no topic
actually, my local test can reproduce the issue although not immediately. seems to happen after a few mins. I enabled TRACE level logging. here seems to be the tight loop. you can see that there are two metadata requests in one milli-seconds. kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 - Ignoring empty metadata response with correlation id 360185. kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 - Trying to send metadata request to node -2 kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:374 - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=360186,client_id=foo}, body={topics=[]})) to node -2 kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 - Ignoring empty metadata response with correlation id 360186. kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 - Trying to send metadata request to node -2 On Tue, Feb 3, 2015 at 8:10 PM, Steven Wu stevenz...@gmail.com wrote: Hi, We have observed high cpu and high network traffic problem when 1) cluster (0.8.1.1) has no topic 2) KafkaProducer (0.8.2-beta) object is created without sending any traffic We have observed such problem twice. In both cases, problem went away immediately after one/any topic is created. Is this a known issue? Just want to check with the community first before I spend much time to reproduce it. I couldn't reproduce the issue with similar setup with unit test code in IDE. start two brokers with no topic locally on my laptop. create a KafkaProducer object without sending any msgs. but I only tested with 0.8.2-beta for both broker and producer. Thanks, Steven
Re: [VOTE] 0.8.2.0 Candidate 3
In Netflix, we have been using route53 DNS name as bootstrap servers in AWS env. Basically, when a kafka broker start, we add it to route53 DNS name for the cluster. this is like the VIP that Jay suggested. But we are also moving toward to use Eureka service registry for bootstrapping. We are worried that if DNS name happens to resolve to a bad broker. it might impact the bootstrap process/resiliency. We want to get a list of brokers from Eureka to pass in as bootstrap.servers. On Sun, Feb 1, 2015 at 5:30 AM, Jay Kreps jay.kr...@gmail.com wrote: You may already know this but the producer doesn't require a complete list of brokers in its config, it just requires the connection info for one active broker which it uses to discover the rest of the brokers. We allow you to specify multiple urls here for failover in cases where you aren't using a vip. So if you can put three brokers into the VIP for metadata bootstrapping you can still scale up and down the rest of the cluster. -Jay On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker alex.m3...@gmail.com wrote: Jun: You raise a very good question: let me explain why we use Broker.getConnectionString(), so may be we'll get a supported way to answer our need. We use Broker.getConnectionString() because we deploy Kafka services in Amazon EC2 with the following architecture: * Three VMs dedicated to Zookeeper processes * At least two VMs with Kafka broker, but depending on load it can be scaled to more broker VMs. Brokers self-register their address in Zookeeper by serializing Broker objects in Zk. The VMs with Zookeeper have Elastic IPs = stable public IPs, These public IPs are fed to the various Application services which rely on Kafka to stream their logs monitoring data to our central Hadoop system. Using zkclient and the above mentionned public zookeeper IPs, we get the list of brokers registrered to a given Kafka service: this is where we unserializer Broker objects and then use getConnectionString() to discover the brokers' addresses. Then, brokers addresses are used to initialize the Kafka producer(s). The whole trick is that we cannot use Elastic IP (=stable IPs) for Kafka VMs, because of their 'elastic nature : we want to be able to scale up / down the number of VMs with Kafka brokers. Now, we understand that using non public Kafka API is bad : we've been broken when moving to 0.8.1.1, then again when moving to 0.8.2.0... So it's time to raise the right question: what would be the supported way to configure our producers given our dynamic-IP-for-brokers context? Thanks, Alex. 2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre alexandre.vermeerber...@3ds.com: -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Sunday, February 01, 2015 3:03 To: users@kafka.apache.org; kafka-clie...@googlegroups.com Cc: d...@kafka.apache.org Subject: Re: [VOTE] 0.8.2.0 Candidate 3 Hi, Alex, Thanks for testing RC3. Broker.connectionString() is actually not part of the public api for the producer. Is there a particular reason that you need to use this api? Thanks, Jun On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker alex.m3...@gmail.com wrote: Hello, I have read Broker.scala source code, and I found the answer: - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java code. - With Kafka 0.8.2.0, this method has been replaced by a 0-arity method without the get prefix, so we have to change our Java code to call Broker.connectionString() So despite binary compatibility is broken, we have a by-pass. I hope this will help other people relying on this API... and I'm going to continue tests with 0.8.2 rc3.. Alex 2015-01-31 21:23 GMT+01:00 Alex The Rocker alex.m3...@gmail.com: Hello, I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with our application: 1st test: == replace all kafka .jar files in our application on consumming side (without recompiling anything) = tests passed, OK 2nd test: === replace all kafka .jar files in our application on producubg side (without recompiling anything) = KO, we get this error: 2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect - Exception in thread Timer-2 2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect - java.lang.NoSuchMethodError: kafka.cluster.Broker.getConnectionString()Ljava/lang/String; Which means that binary compatibility with 0.8.1.1 version has been broken. We use getConnectionString() to get Broker's zookeepers adresses, see this answer from Neha: http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCA
Re: Using Only one partition to store object change log
do you need total ordering among all events? or you just need ordering by some partitionKey (e.g. events regarding one particular database key or user id)? if it's the later, you can create multiple partitions and just partition your events using the key to different kafka partitions. On Fri, Jan 30, 2015 at 12:57 AM, noodles rungumpth...@gmail.com wrote: HI Group: I'm currently working to push object changes into external services. Now we are trying to append the change record into a Kafka. **My problem** Only one partition can be created for one kind of data, so that the sequence of change can be guaranteed. If I do like that, I guess I will lost the performance and the load balance feature. Do I need to worry about this problem? -- *noodles!*
Re: [DISCUSSION] Boot dependency in the new producer
Jay, I don't think this line will bootstrap full metadata (for all topics). it will just construct the cluster object with bootstrap host. you need to do metadata.add(topic) to set interest of a topic's partition metadata. Guozhang, I personally think this is ok. it just do a few DNS lookup or TCP connection before first send. On Mon, Jan 26, 2015 at 2:07 PM, Jay Kreps jay.kr...@gmail.com wrote: Oh, yes, I guess I thought you meant that construction of the client would block on the metadata request. I don't personally think that is a problem because if it fails it will retry in the background, right? But actually I think this is probably violating another desirable criteria we had talked about which was keeping the producer from bootstrapping the full metadata for all partitions. If it is doing that during construction time presumably the resulting metadata request is for all partitions, no? That isn't a huge problem, but I think isn't what was intended. -Jay On Mon, Jan 26, 2015 at 1:34 PM, Guozhang Wang wangg...@gmail.com wrote: It will set the needUpdate flag to true and hence the background Sender will try to talk to the bootstrap servers. Guozhang On Mon, Jan 26, 2015 at 1:12 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Guozhang, That line shouldn't cause any connections to Kafka to be established, does it? All that is doing is creating the Cluster pojo using the supplied addresses. The use of InetSocketAddress may cause some dns stuff to happen, though... -Jay On Mon, Jan 26, 2015 at 10:50 AM, Guozhang Wang wangg...@gmail.com wrote: Hi all, I am not sure if we have discussed about this before, but recently I realized that we have introduced boot dependency of the kafka-server specified by the bootstrap.servers config in the new producer. More specifically, although in the old producer we also have a similar config for specifying the broker list, the producer will not try to connect to those brokers until the first message send call is triggered; whereas in the new producer, it will try to talk to them in construction time via: update(Cluster.bootstrap(addresses), time.milliseconds()); I personally am neutral to this change, as in most cases the corresponding kafka server should be up and running before the producer clients are deployed, but there are still some corner cases when it is not true, for example some standalone deployment tests of the app embedded with some clients, etc. So I would like to bring this up to people's attention if we have not discussed about it before: do we think this is OK to introduce this boot dependency in the new producer? -- Guozhang -- -- Guozhang
Re: Kafka 0.8.2 new producer blocking on metadata
preinitialize.metadata=true/false can help to certain extent. if the kafka cluster is down, then metadata won't be available for a long time (not just the first msg). so to be safe, we have to set metadata.fetch.timeout.ms=1 to fail fast as Paul mentioned. I can also echo Jay's comment that on-demand fetch of metadata might be more efficient, since cluster may have many topics that a particular producer may not care. so I plan to do sth similar to what Paul described. - metadata.fetch.timeout.ms=1 - enqueue msg to a pending queue when topic metadata not available. - have a background thread check when metadata become available and drain the pending queue - optionally, prime topic metadata asynchronously during init (if configured) Just wondering whether above should be the default behavior of best-effort non-blocking delivery in kafka clients. then we don't have to reinvent the wheels. Thanks, Steven On Mon, Dec 29, 2014 at 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote: I don't think a separate queue will be a very simple solution to implement. Could you describe your use case a little bit more. It does seem to me that as long as the metadata fetch happens only once and the blocking has a tight time bound this should be okay in any use case I can imagine. And, of course, by default the client blocks anyway whenever you exhaust the memory buffer space. But it sounds like you feel it isn't. Maybe you could describe the scenario a bit? I think one thing we could do is what was discussed in another thread, namely add an option like preinitialize.metadata=true/false which would default to false. When true this would cause the producer to just initialize metadata for all topics when it is created. Note that this then brings back the opposite problem--doing remote communication during initialization which tends to bite a lot of people. But since this would be an option that would default to false perhaps it would be less likely to come as a surprise. -Jay On Mon, Dec 29, 2014 at 8:38 AM, Steven Wu stevenz...@gmail.com wrote: +1. it should be truly async in all cases. I understand some challenges that Jay listed in the other thread. But we need a solution nonetheless. e.g. can we maintain a separate list/queue/buffer for pending messages without metadata. On Tue, Dec 23, 2014 at 12:57 PM, John Boardman boardmanjo...@gmail.com wrote: I was just fighting this same situation. I never expected the new producer send() method to block as it returns a Future and accepts a Callback. However, when I tried my unit test, just replacing the old producer with the new, I immediately started getting timeouts waiting for metadata. I struggled with this until I went into the source code and found the wait() that waits for the metadata. At that point I realized that this new async producer would have to be executed on its own thread, unlike the old producer, which complicates my code unnecessarily. I totally agree with Paul that the contract of send() is being completely violated with internal code that can block. I did try fetching the metadata first, but that only worked for a few calls before the producer decided it was time to update the metadata again. Again, I agree with Paul that this API should be fixed so that it is truly asynchronous in all cases. Otherwise, it cannot be used on the main thread of an application as it will block and fail.
Re: Kafka 0.8.2 new producer blocking on metadata
+1. it should be truly async in all cases. I understand some challenges that Jay listed in the other thread. But we need a solution nonetheless. e.g. can we maintain a separate list/queue/buffer for pending messages without metadata. On Tue, Dec 23, 2014 at 12:57 PM, John Boardman boardmanjo...@gmail.com wrote: I was just fighting this same situation. I never expected the new producer send() method to block as it returns a Future and accepts a Callback. However, when I tried my unit test, just replacing the old producer with the new, I immediately started getting timeouts waiting for metadata. I struggled with this until I went into the source code and found the wait() that waits for the metadata. At that point I realized that this new async producer would have to be executed on its own thread, unlike the old producer, which complicates my code unnecessarily. I totally agree with Paul that the contract of send() is being completely violated with internal code that can block. I did try fetching the metadata first, but that only worked for a few calls before the producer decided it was time to update the metadata again. Again, I agree with Paul that this API should be fixed so that it is truly asynchronous in all cases. Otherwise, it cannot be used on the main thread of an application as it will block and fail.
Re: Is Kafka documentation regarding null key misleading?
Guozhang, can you point me to the code that implements periodic/sticky random partitioner? I actually like to try it out in our env, even though I assume it is NOT ported to 0.8.2 java producer. Thanks, Steven On Mon, Dec 8, 2014 at 1:43 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Yury, Originally the producer behavior under null-key is random random, but later changed to this periodic random to reduce the number of sockets on the server side: imagine if you have n brokers and m producers where m n, with random random distribution each server will need to maintain a socket with each of the m producers. We realized that this change IS misleading and we have changed back to random random in the new producer released in 0.8.2. Guozhang On Fri, Dec 5, 2014 at 10:43 AM, Andrew Jorgensen ajorgen...@twitter.com.invalid wrote: If you look under Producer configs you see the following key ‘ topic.metadata.refresh.interval.ms’ with a default of 600 * 1000 (10 minutes). It is not entirely clear but this controls how often a producer will a null key partitioner will switch partitions that it is writing to. In my production app I set this down to 1 minute and haven’t seen any ill effects but it is good to note that the shorter you get *could* cause some issues and extra overhead. I agree this could probably be a little more clear in the documentation. - Andrew Jorgensen @ajorgensen On December 5, 2014 at 1:34:00 PM, Yury Ruchin (yuri.ruc...@gmail.com) wrote: Hello, I've come across a (seemingly) strange situation when my Kafka producer gave so uneven distribution across partitions. I found that I used null key to produce messages, guided by the following clause in the documentation: If the key is null, then a random broker partition is picked. However, after looking at the code, I found that the broker partition is not truly random for every message - instead, the randomly picked partition number sticks and only refreshes after the topic.metadata.refresh.ms expires, which is 10 minutes by default. So, with null key the producer keeps writing to the same partition for 10 minutes. Is my understanding of partitioning with null key correct? If yes, shouldn't the documentation be fixed then to explicitly describe the sticky pseudo-random partition assignment? Thanks, Yury -- -- Guozhang
Re: [DISCUSSION] adding the serializer api back to the new java producer
In practice the cases that actually mix serialization types in a single stream are pretty rare I think just because the consumer then has the problem of guessing how to deserialize, so most of these will end up with at least some marker or schema id or whatever that tells you how to read the data. Arguable this mixed serialization with marker is itself a serializer type and should have a serializer of its own... agree that it is unlikely to have mixed serialization format for one topic/type. But we sometimes/often create one Producer object for one cluster. and there can be many topics on this cluster. different topics may have different serialization formats. So I agree with Guozhang's point regarding data type flexibility of using simple byte[] (instead of generic K, V). On Fri, Dec 5, 2014 at 5:00 PM, Jay Kreps j...@confluent.io wrote: Hey Sriram, Thanks! I think this is a very helpful summary. Let me try to address your point about passing in the serde at send time. I think the first objection is really to the paired key/value serializer interfaces. This leads to kind of a weird combinatorial thing where you would have an avro/avro serializer a string/avro serializer, a pb/pb serializer, and a string/pb serializer, and so on. But your proposal would work as well with separate serializers for key and value. I think the downside is just the one you call out--that this is a corner case and you end up with two versions of all the apis to support it. This also makes the serializer api more annoying to implement. I think the alternative solution to this case and any other we can give people is just configuring ByteArraySerializer which gives you basically the api that you have now with byte arrays. If this is incredibly common then this would be a silly solution, but I guess the belief is that these cases are rare and a really well implemented avro or json serializer should be 100% of what most people need. In practice the cases that actually mix serialization types in a single stream are pretty rare I think just because the consumer then has the problem of guessing how to deserialize, so most of these will end up with at least some marker or schema id or whatever that tells you how to read the data. Arguable this mixed serialization with marker is itself a serializer type and should have a serializer of its own... -Jay On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: This thread has diverged multiple times now and it would be worth summarizing them. There seems to be the following points of discussion - 1. Can we keep the serialization semantics outside the Producer interface and have simple bytes in / bytes out for the interface (This is what we have today). The points for this is to keep the interface simple and usage easy to understand. The points against this is that it gets hard to share common usage patterns around serialization/message validations for the future. 2. Can we create a wrapper producer that does the serialization and have different variants of it for different data formats? The points for this is again to keep the main API clean. The points against this is that it duplicates the API, increases the surface area and creates redundancy for a minor addition. 3. Do we need to support different data types per record? The current interface (bytes in/bytes out) lets you instantiate one producer and use it to send multiple data formats. There seems to be some valid use cases for this. I have still not seen a strong argument against not having this functionality. Can someone provide their views on why we don't need this support that is possible with the current API? One possible approach for the per record serialization would be to define public interface SerDeK,V { public byte[] serializeKey(); public K deserializeKey(); public byte[] serializeValue(); public V deserializeValue(); } This would be used by both the Producer and the Consumer. The send APIs can then be public FutureRecordMetadata send(ProducerRecordK,V record); public FutureRecordMetadata send(ProducerRecordK,V record, Callback callback); public FutureRecordMetadata send(ProducerRecordK,V record, SerDeK,V serde); public FutureRecordMetadata send(ProducerRecordK,V record, SerDeK,V serde, Callback callback); A default SerDe can be set in the config. The producer would use the default from the config if the non-serde send APIs are used. The downside to this approach is that we would need to have four variants of Send API for the Producer. On 12/5/14 3:16 PM, Jun Rao j...@confluent.io wrote: Jiangjie, The issue with adding the serializer in ProducerRecord is that you need to implement all combinations of serializers for key and value. So, instead of just implementing int and string serializers,
Re: How to produce and consume events in 2 DCs?
Erik, I don't know that mirrormaker can't write to a different topic. but it might be an useful feature request to mirrormaker. On Wed, Oct 22, 2014 at 12:21 AM, Erik van Oosten e.vanoos...@grons.nl.invalid wrote: Hi Steven, That doesn't work. In your proposal mirrormaker in once DC would copy messages from topic A to the other DC in topic A. However, in the other DC there is a mirrormaker which does the same, creating a loop. Messages will be duplicated, triplicated, etc in a never ending loop. Mirroring to another topic would work (mirrormaker doesn't support that), and so would mirroring to another cluster. Neha's proposal would work also but I assume its a lot more work for the Kafka internals and therefor IMHO wouldn't meet the kiss principle. Kind regards, Erik. Steven Wu schreef op 22-10-14 om 01:48: I think it doesn't have to be two more clusters. can be just two more topics. MirrorMaker can copy from source topics in both regions into one aggregate topic. On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com het volgende geschreven: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/ -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: How to produce and consume events in 2 DCs?
I think it doesn't have to be two more clusters. can be just two more topics. MirrorMaker can copy from source topics in both regions into one aggregate topic. On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com het volgende geschreven: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/
how to identify rogue consumer
I have seen very high Fetch-Consumer-RequestsPerSec (like 15K) per broker in a relatively idle cluster. My hypothesis some misbehaving consumer has a tight polling loop without any back-off logic with empty fetch. Unfortunately, this metric doesn't have per-topic breakdown like BytesInPerSec or MessagesInPerSec. So I can't really tell which topic/consumer is pounding the cluster. Also the storm already ended. So I can't use tcpdump to capture live traffic. So any suggestion?
Re: how to identify rogue consumer
Jun, you mean trace level logging for requestAppender? log4j.logger.kafka.network.Processor=TRACE, requestAppender if it happens again, I can try to enable it. On Wed, Oct 8, 2014 at 9:54 PM, Jun Rao jun...@gmail.com wrote: If enabled request logging, you can find this out. Thanks, Jun On Wed, Oct 8, 2014 at 8:57 PM, Steven Wu stevenz...@gmail.com wrote: I have seen very high Fetch-Consumer-RequestsPerSec (like 15K) per broker in a relatively idle cluster. My hypothesis some misbehaving consumer has a tight polling loop without any back-off logic with empty fetch. Unfortunately, this metric doesn't have per-topic breakdown like BytesInPerSec or MessagesInPerSec. So I can't really tell which topic/consumer is pounding the cluster. Also the storm already ended. So I can't use tcpdump to capture live traffic. So any suggestion?
Re: BytesOutPerSec is more than BytesInPerSec.
couldn't see your graph. but your replicator factor is 2. then replication traffic can be the explanation. basically, BytesOut will be 2x of BytesIn. On Thu, Sep 25, 2014 at 6:19 PM, ravi singh rrs120...@gmail.com wrote: I have set up my kafka broker with as single producer and consumer. When I am plotting the graph for all topic bytes in/out per sec i could see that value of BytesOutPerSec is more than BytesInPerSec. Is this correct? I confirmed that my consumer is consuming the messages only once. What could be the reason for this behavior? [image: Inline image 1] -- *Regards,* *Ravi*
Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs
partition [foo,2]'s state from New to Online since LeaderAndIsr path already exists with value {leader:1,leader_epoch:0,isr:[1,0]} and controller epoch 5 at kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartition(PartitionStateMachine.scala:302) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:183) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:125) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:124) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:124) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:473) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:460) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:422) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:403) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-09-03 21:18:41,168] ERROR Controller 5 epoch 5 initiated state change of replica 1 for partition [foo,2] from NonExistentReplica to OnlineReplica failed (state.change.logger) java.lang.AssertionError: assertion failed: Replica [Topic=foo,Partition=2,Replica=1] should be in the NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states before moving to OnlineReplica state. Instead it is in NonExistentReplica state at scala.Predef$.assert(Predef.scala:160) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:284) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:198) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:96) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:96) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:96) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:474) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:460) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:422) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:403) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) On Tue, Sep 9, 2014 at 7:18 PM, Steven Wu stevenz...@gmail.com wrote: previous email is from state-change.log. also found this WARN in controller.log [2014-09-09 01:10:53,669] WARN [Controller 5]: Cannot remove replica 0 from ISR of partition [cdn_selection_runtime_data,8] since it is not in the ISR. Leader = 1 ; ISR = List(1) (kafka.controller.KafkaController) On Tue, Sep 9, 2014 at 4:14 PM, Steven Wu stevenz...@gmail.com wrote: ah. maybe you mean the controller log on leader/controller broker 5. yes. I do noticed some errors regarding these two partitions. [2014-09-09 01:10:53,651] ERROR Controller 5 epoch 5 encountered error while changing partition [foo,2]'s state from New to Online since LeaderAndIsr path alrea dy exists with value {leader:0,leader_epoch:1,isr:[0,1]} and controller epoch 5 (state.change.logger) [2014-09-09 01:10:53,652] ERROR Controller 5 epoch 5 initiated state change for partition [foo,2] from NewPartition
Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs
noticed one important thing. topic foo's partition 1 and 2 have empty .log file on replicas. I suspect replication doesn't create the partition dir on broker 0 in this case, which then cause the WARN logs. On Mon, Sep 8, 2014 at 11:11 PM, Steven Wu stevenz...@gmail.com wrote: sorry. forgot to mention that I am running 0.8.1.1 On Mon, Sep 8, 2014 at 9:26 PM, Steven Wu stevenz...@gmail.com wrote: did a push in cloud. after new instance for broker 0 comes up, I see a lot of WARNs in log file. 2014-09-09 04:21:09,271 WARN kafka.utils.Logging$class:83 [request-expiration-task] [warn] [KafkaApi-0] Fetch request with correlation id 51893 from client 1409779957450-6014fc32-0-0 on partition [foo,1] failed due to Partition [foo,1] doesn't exist on 0 2014-09-09 04:21:09,383 WARN kafka.utils.Logging$class:83 [kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with correlation id 51894 from client 1409779957450-6014fc32-0-0 on partition [foo,2] failed due to Partition [foo,2] doesn't exist on 0 zookeeper shows it is the leader after this new instance come back. {controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]} {controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]} but we I check the data dirs. those two partitions aren't there. any idea? Thanks, Steven
Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs
nope. sate-change log files only had some warnings regarding other partitions. nothing related to these two partitions. 2014-09-09 02:54:30,579 WARN kafka.utils.Logging$class:83 [kafka-request-handler-1] [warn] Broker 0 received invalid LeaderAndIsr request with correlation id 497 from controller 5 epoch 5 with an older leader epoch 8 for partition [bar,1], current leader epoch is 8 On Tue, Sep 9, 2014 at 3:37 PM, Jun Rao jun...@gmail.com wrote: Hmm, that seems to indicate that the leader info is not propagated properly from the controller to broker 0. In the state-change log of broker 0, do you see anything related to these 2 partitions when broker 0 comes up? Thanks, Jun On Tue, Sep 9, 2014 at 9:41 AM, Steven Wu stevenz...@gmail.com wrote: noticed one important thing. topic foo's partition 1 and 2 have empty .log file on replicas. I suspect replication doesn't create the partition dir on broker 0 in this case, which then cause the WARN logs. On Mon, Sep 8, 2014 at 11:11 PM, Steven Wu stevenz...@gmail.com wrote: sorry. forgot to mention that I am running 0.8.1.1 On Mon, Sep 8, 2014 at 9:26 PM, Steven Wu stevenz...@gmail.com wrote: did a push in cloud. after new instance for broker 0 comes up, I see a lot of WARNs in log file. 2014-09-09 04:21:09,271 WARN kafka.utils.Logging$class:83 [request-expiration-task] [warn] [KafkaApi-0] Fetch request with correlation id 51893 from client 1409779957450-6014fc32-0-0 on partition [foo,1] failed due to Partition [foo,1] doesn't exist on 0 2014-09-09 04:21:09,383 WARN kafka.utils.Logging$class:83 [kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with correlation id 51894 from client 1409779957450-6014fc32-0-0 on partition [foo,2] failed due to Partition [foo,2] doesn't exist on 0 zookeeper shows it is the leader after this new instance come back. {controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]} {controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]} but we I check the data dirs. those two partitions aren't there. any idea? Thanks, Steven
Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs
assuming you are talking about controller log on broker 0, there is nothing there. $ less kafka-controller 2014-09-09 01:15:02,600 INFO kafka.utils.Logging$class:68 [main] [info] [ControllerEpochListener on 0]: Initialized controller epoch to 5 and zk version 4 2014-09-09 01:15:02,655 INFO kafka.utils.Logging$class:68 [main] [info] [Controller 0]: Controller starting up 2014-09-09 01:15:02,692 INFO kafka.utils.Logging$class:68 [main] [info] [Controller 0]: Controller startup complete On Tue, Sep 9, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote: In the controller log, do you see broker 0 being detected as the new broker when broker 0 comes up? Thanks, Jun On Tue, Sep 9, 2014 at 3:51 PM, Steven Wu stevenz...@gmail.com wrote: nope. sate-change log files only had some warnings regarding other partitions. nothing related to these two partitions. 2014-09-09 02:54:30,579 WARN kafka.utils.Logging$class:83 [kafka-request-handler-1] [warn] Broker 0 received invalid LeaderAndIsr request with correlation id 497 from controller 5 epoch 5 with an older leader epoch 8 for partition [bar,1], current leader epoch is 8 On Tue, Sep 9, 2014 at 3:37 PM, Jun Rao jun...@gmail.com wrote: Hmm, that seems to indicate that the leader info is not propagated properly from the controller to broker 0. In the state-change log of broker 0, do you see anything related to these 2 partitions when broker 0 comes up? Thanks, Jun On Tue, Sep 9, 2014 at 9:41 AM, Steven Wu stevenz...@gmail.com wrote: noticed one important thing. topic foo's partition 1 and 2 have empty .log file on replicas. I suspect replication doesn't create the partition dir on broker 0 in this case, which then cause the WARN logs. On Mon, Sep 8, 2014 at 11:11 PM, Steven Wu stevenz...@gmail.com wrote: sorry. forgot to mention that I am running 0.8.1.1 On Mon, Sep 8, 2014 at 9:26 PM, Steven Wu stevenz...@gmail.com wrote: did a push in cloud. after new instance for broker 0 comes up, I see a lot of WARNs in log file. 2014-09-09 04:21:09,271 WARN kafka.utils.Logging$class:83 [request-expiration-task] [warn] [KafkaApi-0] Fetch request with correlation id 51893 from client 1409779957450-6014fc32-0-0 on partition [foo,1] failed due to Partition [foo,1] doesn't exist on 0 2014-09-09 04:21:09,383 WARN kafka.utils.Logging$class:83 [kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with correlation id 51894 from client 1409779957450-6014fc32-0-0 on partition [foo,2] failed due to Partition [foo,2] doesn't exist on 0 zookeeper shows it is the leader after this new instance come back. {controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]} {controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]} but we I check the data dirs. those two partitions aren't there. any idea? Thanks, Steven
Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs
ah. maybe you mean the controller log on leader/controller broker 5. yes. I do noticed some errors regarding these two partitions. [2014-09-09 01:10:53,651] ERROR Controller 5 epoch 5 encountered error while changing partition [foo,2]'s state from New to Online since LeaderAndIsr path alrea dy exists with value {leader:0,leader_epoch:1,isr:[0,1]} and controller epoch 5 (state.change.logger) [2014-09-09 01:10:53,652] ERROR Controller 5 epoch 5 initiated state change for partition [foo,2] from NewPartition to OnlinePartition failed (state.change.logg er) kafka.common.StateChangeFailedException: encountered error while changing partition [foo,2]'s state from New to Online since LeaderAndIsr path already exists wi th value {leader:0,leader_epoch:1,isr:[0,1]} and controller epoch 5 at kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartition(PartitionStateMachine.scala:302) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:183) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96) at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) On Tue, Sep 9, 2014 at 4:08 PM, Steven Wu stevenz...@gmail.com wrote: assuming you are talking about controller log on broker 0, there is nothing there. $ less kafka-controller 2014-09-09 01:15:02,600 INFO kafka.utils.Logging$class:68 [main] [info] [ControllerEpochListener on 0]: Initialized controller epoch to 5 and zk version 4 2014-09-09 01:15:02,655 INFO kafka.utils.Logging$class:68 [main] [info] [Controller 0]: Controller starting up 2014-09-09 01:15:02,692 INFO kafka.utils.Logging$class:68 [main] [info] [Controller 0]: Controller startup complete On Tue, Sep 9, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote: In the controller log, do you see broker 0 being detected as the new broker when broker 0 comes up? Thanks, Jun On Tue, Sep 9, 2014 at 3:51 PM, Steven Wu stevenz...@gmail.com wrote: nope. sate-change log files only had some warnings regarding other partitions. nothing related to these two partitions. 2014-09-09 02:54:30,579 WARN kafka.utils.Logging$class:83 [kafka-request-handler-1] [warn] Broker 0 received invalid LeaderAndIsr request with correlation id 497 from controller 5 epoch 5 with an older leader epoch 8 for partition [bar,1], current leader epoch is 8 On Tue, Sep 9, 2014 at 3:37 PM, Jun Rao jun...@gmail.com wrote: Hmm, that seems to indicate that the leader info is not propagated properly from the controller to broker 0. In the state-change log
Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs
previous email is from state-change.log. also found this WARN in controller.log [2014-09-09 01:10:53,669] WARN [Controller 5]: Cannot remove replica 0 from ISR of partition [cdn_selection_runtime_data,8] since it is not in the ISR. Leader = 1 ; ISR = List(1) (kafka.controller.KafkaController) On Tue, Sep 9, 2014 at 4:14 PM, Steven Wu stevenz...@gmail.com wrote: ah. maybe you mean the controller log on leader/controller broker 5. yes. I do noticed some errors regarding these two partitions. [2014-09-09 01:10:53,651] ERROR Controller 5 epoch 5 encountered error while changing partition [foo,2]'s state from New to Online since LeaderAndIsr path alrea dy exists with value {leader:0,leader_epoch:1,isr:[0,1]} and controller epoch 5 (state.change.logger) [2014-09-09 01:10:53,652] ERROR Controller 5 epoch 5 initiated state change for partition [foo,2] from NewPartition to OnlinePartition failed (state.change.logg er) kafka.common.StateChangeFailedException: encountered error while changing partition [foo,2]'s state from New to Online since LeaderAndIsr path already exists wi th value {leader:0,leader_epoch:1,isr:[0,1]} and controller epoch 5 at kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartition(PartitionStateMachine.scala:302) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:183) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96) at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) On Tue, Sep 9, 2014 at 4:08 PM, Steven Wu stevenz...@gmail.com wrote: assuming you are talking about controller log on broker 0, there is nothing there. $ less kafka-controller 2014-09-09 01:15:02,600 INFO kafka.utils.Logging$class:68 [main] [info] [ControllerEpochListener on 0]: Initialized controller epoch to 5 and zk version 4 2014-09-09 01:15:02,655 INFO kafka.utils.Logging$class:68 [main] [info] [Controller 0]: Controller starting up 2014-09-09 01:15:02,692 INFO kafka.utils.Logging$class:68 [main] [info] [Controller 0]: Controller startup complete On Tue, Sep 9, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote: In the controller log, do you see broker 0 being detected as the new broker when broker 0 comes up? Thanks, Jun On Tue, Sep 9, 2014 at 3:51 PM, Steven Wu stevenz...@gmail.com wrote: nope. sate-change log files only had some warnings regarding other partitions. nothing related to these two partitions. 2014-09-09 02:54:30,579 WARN kafka.utils.Logging$class:83
new broker instance can't sync up one partition from peers
I did a push. new instance comes up and tries to fetch log/data from other peers/replicas. Out of 60 partitions assigned for broker 0, it sync'ed up 59. but for whatever reason, it didn't try to fetch this partition/topic. [out-of-sync replica] BrokerId: 0, Topic: foo, PartitionId: 6, Leader: 5, Replicas: 5,0, Isr: 5 I checked the partition data size on broker 5. it is actually very small. $ du -hs /mnt/data/kafka/foo-6/ 200K/mnt/data/kafka/foo-6/ checked log file on broker 0 and 5. couldn't find anything relevant to this issue. found a different issue that I will ask in a separate thread. Any suggestion? Thanks, Steven
zookeeper shows partition in-sync but local disk doesn't have the data dirs
did a push in cloud. after new instance for broker 0 comes up, I see a lot of WARNs in log file. 2014-09-09 04:21:09,271 WARN kafka.utils.Logging$class:83 [request-expiration-task] [warn] [KafkaApi-0] Fetch request with correlation id 51893 from client 1409779957450-6014fc32-0-0 on partition [foo,1] failed due to Partition [foo,1] doesn't exist on 0 2014-09-09 04:21:09,383 WARN kafka.utils.Logging$class:83 [kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with correlation id 51894 from client 1409779957450-6014fc32-0-0 on partition [foo,2] failed due to Partition [foo,2] doesn't exist on 0 zookeeper shows it is the leader after this new instance come back. {controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]} {controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]} but we I check the data dirs. those two partitions aren't there. any idea? Thanks, Steven
Re: undesirable log retention behavior
log.retention.bytes can somewhat help. but it is cumbersome to use because it is a per-topic config for partition limit. there was an earlier thread regarding global bytes limit. that will work well for my purpose of avoiding disk full. https://issues.apache.org/jira/browse/KAFKA-1489 On Thu, Jul 31, 2014 at 7:39 PM, Joe Stein joe.st...@stealth.ly wrote: What version of Kafka are your using? Have you tried log.retention.bytes? Which ever comes first (ttl or bytes total) should do what you are looking for if I understand you right. http://kafka.apache.org/documentation.html#brokerconfigs /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Jul 31, 2014 6:52 PM, Steven Wu steve...@netflix.com.invalid wrote: it seems that log retention is purely based on last touch/modified timestamp. This is undesirable for code push in aws/cloud. e.g. let's say retention window is 24 hours. disk size is 1 TB. disk util is 60% (600GB). when new instance comes up, it will fetch log files (600GB) from peers. those log files all have newer timestamps. they won't be purged until 24 hours later. note that during the first 24 hours, new msgs (another 600GB) continue to come in. This can cause disk full problem without any intervention. With this behavior, we have to keep disk util under 50%. can last modified timestamp be inserted into the file name when rolling over log files? then kafka can check the file name for timestamp. does this make sense? Thanks, Steven
log retention and rollover
This might be a bit unusual. We have a topic that we only need to keep last 5 minutes of msgs so that replay from beginning is fast. Although retention.ms has time unit of minute, segment.ms ONLY has time unit of hour. If I understand cleanup correctly, it can only delete files that are rolled over. If true, the minimal retention period can be actually one hour. is there any particular reason for different time units for retention and roll? Can we add log.roll.minutes? retention.ms7 dayslog.retention.minutesThis configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the delete retention policy. This represents an SLA on how soon consumers must read their data. segment.ms7 dayslog.roll.hoursThis configuration controls the period of time after which Kafka will force the log to roll even if the segment file isn't full to ensure that retention can delete or compact old data. Thanks, Steven
Re: log retention and rollover
created KAFKA-1480 https://issues.apache.org/jira/browse/KAFKA-1480. Thanks! it's also generally better to have consistent/matching time unit for these two configs. On Mon, Jun 2, 2014 at 4:22 PM, Guozhang Wang wangg...@gmail.com wrote: Steven, We initially set the rolling criterion based on hours to avoid too frequent log rolling and in turn too small segment files. For your case this may be reasonable to set the rolling criterion on minutes. Could you file a JIRA? Guozhang On Mon, Jun 2, 2014 at 4:00 PM, Steven Wu steve...@netflix.com.invalid wrote: This might be a bit unusual. We have a topic that we only need to keep last 5 minutes of msgs so that replay from beginning is fast. Although retention.ms has time unit of minute, segment.ms ONLY has time unit of hour. If I understand cleanup correctly, it can only delete files that are rolled over. If true, the minimal retention period can be actually one hour. is there any particular reason for different time units for retention and roll? Can we add log.roll.minutes? retention.ms7 dayslog.retention.minutesThis configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the delete retention policy. This represents an SLA on how soon consumers must read their data. segment.ms7 dayslog.roll.hoursThis configuration controls the period of time after which Kafka will force the log to roll even if the segment file isn't full to ensure that retention can delete or compact old data. Thanks, Steven -- -- Guozhang
Re: kafka broker failed to recovery from ZK failure
well. I just run the kafka-run-class.sh script with DeleteTopicCommand class directly. I noticed the warning on documentation http://DeleteTopicCommand page. but I thought it is only about not complete cleanup. I didn't know it can affect cluster health. maybe it's worthwhile to point it out the actual danger. On Thu, May 22, 2014 at 8:49 PM, Jun Rao jun...@gmail.com wrote: Delete topic is not supported in 0.8.1.1. How did you do it? Thanks, Jun On Thu, May 22, 2014 at 9:59 AM, Steven Wu steve...@netflix.com wrote: yes. I deleted a topic. but not at the time. it's a few hours before. On Fri, May 16, 2014 at 3:49 PM, Jun Rao jun...@gmail.com wrote: The problem is indicated by the following log in broker 1's controller log. Were you deleting any topic at that time? [2014-05-12 21:24:37,930] ERROR [BrokerChangeListener on Controller 1]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.util.NoSuchElementException: key not found: [ocp.fillhealth.us-east-1,0] at scala.collection.MapLike$class.default(MapLike.scala:225) at scala.collection.mutable.HashMap.default(HashMap.scala:45) at scala.collection.MapLike$class.apply(MapLike.scala:135) at scala.collection.mutable.HashMap.apply(HashMap.scala:45) at kafka.controller.ControllerBrokerRequestBatch.updateMetadataRequestMapFor$1(ControllerChannelManager.scala:242) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:99) at kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:268) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:940) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:386) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:342) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Thanks, Jun On Tue, May 13, 2014 at 9:50 AM, Steven Wu steve...@netflix.com wrote: sorry for the wrong log file. please see the attached zip file for all 4 log files. On Mon, May 12, 2014 at 9:05 PM, Jun Rao jun...@gmail.com wrote: The controller log in broker 1 is too late. Could you send its log around 2014-05-12 21:24:37? Thanks, Jun On Mon, May 12, 2014 at 5:02 PM, Steven Wu steve...@netflix.com wrote: This is a three-node cluster. broker 0 lost connection to ZK. broker 1 does seem to take the controller role. but broker 0 stuck in the bad state and wasn't able to recover. it seems to start with these error msgs. I have attached complete controller and server log for broker 0 and 1. I am
Re: kafka broker failed to recovery from ZK failure
yes. I deleted a topic. but not at the time. it's a few hours before. On Fri, May 16, 2014 at 3:49 PM, Jun Rao jun...@gmail.com wrote: The problem is indicated by the following log in broker 1's controller log. Were you deleting any topic at that time? [2014-05-12 21:24:37,930] ERROR [BrokerChangeListener on Controller 1]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.util.NoSuchElementException: key not found: [ocp.fillhealth.us-east-1,0] at scala.collection.MapLike$class.default(MapLike.scala:225) at scala.collection.mutable.HashMap.default(HashMap.scala:45) at scala.collection.MapLike$class.apply(MapLike.scala:135) at scala.collection.mutable.HashMap.apply(HashMap.scala:45) at kafka.controller.ControllerBrokerRequestBatch.updateMetadataRequestMapFor$1(ControllerChannelManager.scala:242) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:99) at kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:268) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:940) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:386) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:342) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Thanks, Jun On Tue, May 13, 2014 at 9:50 AM, Steven Wu steve...@netflix.com wrote: sorry for the wrong log file. please see the attached zip file for all 4 log files. On Mon, May 12, 2014 at 9:05 PM, Jun Rao jun...@gmail.com wrote: The controller log in broker 1 is too late. Could you send its log around 2014-05-12 21:24:37? Thanks, Jun On Mon, May 12, 2014 at 5:02 PM, Steven Wu steve...@netflix.com wrote: This is a three-node cluster. broker 0 lost connection to ZK. broker 1 does seem to take the controller role. but broker 0 stuck in the bad state and wasn't able to recover. it seems to start with these error msgs. I have attached complete controller and server log for broker 0 and 1. I am using kafka_2.9.2-0.8.1.1.jar. Thanks, Steven [2014-05-12 21:24:28,737] INFO Client session timed out, have not heard from server in 4000ms for sessionid 0xb145a9585a806013, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2014-05-12 21:24:28,838] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2014-05-12 21:24:29,360] INFO Opening socket connection to server ip-10-84-58-49.ec2.internal/10.84.58.49:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2014-05-12 21:24:30,562] INFO Client session timed out, have not heard from server in 1724ms for sessionid 0xb145a9585a806013, closing socket connection and attempting reconnect
Re: question about mirror maker
if placing mirror maker in the same datacenter as target cluster, it/consumer will talks to zookeeper in remote/source datacenter. would it more susceptible to network problems? As for the problem commit offset without actually producing/writing msgs to target cluster, it can be solved by disabling auto-commit. and only commit msgs that are actually persisted in target cluster. what do you think of this opposite approach? On Sun, May 11, 2014 at 8:48 PM, Todd Palino tpal...@linkedin.com wrote: Yes, on both counts. Putting the mirror maker in the same datacenter in the target cluster is exactly what we do as well. We also monitor both the consumer lag (by comparing the offsets stored in Zookeeper and the tail offset on the brokers), and the number of dropped and failed messages on the mirror maker producer side. The other thing to do is to make sure to check very carefully when you are changing anything about the producer configuration, to assure that you have not made a mistake. -Todd On 5/11/14, 9:12 AM, Weide Zhang weo...@gmail.com wrote: Hi Todd, Thanks for your answer. with regard to fail over for mirror maker, does that mean if i have 4 mirror maker running in different machines with same consumer group, it will auto load balance if one of the mirror maker fails ? Also, it looks to prevent mirror maker commit wrong (consumer work but not producer) due to cross data center network issue, mirror maker need to be placed along with the target cluster so that this scenario is minimized ? On Sat, May 10, 2014 at 11:39 PM, Todd Palino tpal...@linkedin.com wrote: Well, if you have a cluster in each datacenter, all with the same topics, you can¹t just mirror the messages between them, as you will create a loop. The way we do it is to have a ³local² cluster and an ³aggregate² cluster. The local cluster has the data for only that datacenter. Then we run mirror makers that copy the messages from each of the local clusters into the aggregate cluster. Everything produces into the local clusters, and nothing produces into the aggregate clusters. In general, consumers consume from the aggregate cluster (unless they specifically want only local data). The mirror maker is as fault tolerant as any other consumer. That is, if a mirror maker goes down, the others configured with the same consumer group (we generally run at least 4 for any mirror maker, sometimes up to 10) will rebalance and start back up from the last committed offset. What you need to watch out for is if the mirror maker is unable to produce messages, for example, if the network goes down. If it can still consume messages, but cannot produce them, you will lose messages as the consumer will continue to commit offsets with no knowledge that the producer is failing. -Todd On 5/8/14, 11:20 AM, Weide Zhang weo...@gmail.com wrote: Hi, I have a question about mirror maker. say I have 3 data centers each producing topic 'A' with separate kafka cluster running. if 3 of the data need to be kept in sync with each other, shall i create 3 mirror maker in each data center to get the data from the other two ? also, it mentioned that mirror making is not fault tolerant ? so what will be the behavior of mirror consumer if it went down due to network and back up ? do they catch up with last offset from which they last mirror ? If so, is it enabled by default or I have to configure ? Thanks a lot, Weide