[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323299#comment-15323299 ] Evan Huus commented on KAFKA-2082: -- This issue has been "In Progress" since April 2015, any actual progress in that time? > Kafka Replication ends up in a bad state > > > Key: KAFKA-2082 > URL: https://issues.apache.org/jira/browse/KAFKA-2082 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8.2.1 >Reporter: Evan Huus >Assignee: Sriharsha Chintalapani >Priority: Critical > Labels: zkclient-problems > Attachments: KAFKA-2082.patch > > > While running integration tests for Sarama (the go client) we came across a > pattern of connection losses that reliably puts kafka into a bad state: > several of the brokers start spinning, chewing ~30% CPU and spamming the logs > with hundreds of thousands of lines like: > {noformat} > [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,1] failed due to Leader not local for partition > [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) > [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,6] failed due to Leader not local for partition > [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) > [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,21] failed due to Leader not local for partition > [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) > [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,26] failed due to Leader not local for partition > [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) > [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,1] failed due to Leader not local for partition > [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) > [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,6] failed due to Leader not local for partition > [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) > [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,21] failed due to Leader not local for partition > [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) > [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch > request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on > partition [many_partition,26] failed due to Leader not local for partition > [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) > {noformat} > This can be easily and reliably reproduced using the {{toxiproxy-final}} > branch of https://github.com/Shopify/sarama which includes a vagrant script > for provisioning the appropriate cluster: > - {{git clone https://github.com/Shopify/sarama.git}} > - {{git checkout test-jira-kafka-2082}} > - {{vagrant up}} > - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} > After the test finishes (it fails because the cluster ends up in a bad > state), you can log into the cluster machine with {{vagrant ssh}} and inspect > the bad nodes. The vagrant script provisions five zookeepers and five brokers > in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. > Additional context: the test produces continually to the cluster while > randomly cutting and restoring zookeeper connections (all connections to > zookeeper are run through a simple proxy on the same vm to make this easy). > The majority of the time this works very well and does a good job exercising > our producer's retry and failover code. However, under certain patterns of > connection loss (the {{TEST_SEED}} in the instructions is important), kafka > gets confused. The test never cuts more than two connections at a time, so > zookeeper should always have quorum, and the topic (with three replicas) > should always be writable. > Completely restarting the cluster via {{vagrant reload}} seems to put it back > into a sane state. -- This message was sent by Atlassian JIRA
[jira] [Commented] (KAFKA-3289) Update Kafka protocol guide wiki for KIP-31 / KIP-32
[ https://issues.apache.org/jira/browse/KAFKA-3289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15174318#comment-15174318 ] Evan Huus commented on KAFKA-3289: -- OK, I commented there because I didn't know this ticket existed and had no other obvious way to contact you. It would be helpful if for the new versions of produce/consume requests you indicated which fields had changed somehow. Offset Commit Request does this by just listing the format three times, but since I think the changes this time are in the *message* format, I'm not sure the best way to show that. Thanks! > Update Kafka protocol guide wiki for KIP-31 / KIP-32 > > > Key: KAFKA-3289 > URL: https://issues.apache.org/jira/browse/KAFKA-3289 > Project: Kafka > Issue Type: Sub-task >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Fix For: 0.10.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2871) Newly replicated brokers don't expire log segments properly
Evan Huus created KAFKA-2871: Summary: Newly replicated brokers don't expire log segments properly Key: KAFKA-2871 URL: https://issues.apache.org/jira/browse/KAFKA-2871 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Neha Narkhede Priority: Minor We recently brought up a few brokers to replace some existing nodes, and used the provided script to reassign partitions from the retired nodes to the new ones, one at a time. A little while after the fact, we noticed extreme disk usage on the new nodes. Tracked this down to the fact that the replicated segments are all timestamped from the moment of replication rather than using whatever timestamp was set on the original node. Since this is the timestamp the log roller uses, it takes a full week (rollover time) before any data is purged from the new brokers. In the short term, what is the safest workaround? Can we just `rm` these old segments, or should we be messing with the filesystem metadata so kafka removes them itself? In the longer term, the partition mover should be setting timestamps appropriately on the segments it moves. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2537) Mirrormaker defaults to localhost with no sanity checks
Evan Huus created KAFKA-2537: Summary: Mirrormaker defaults to localhost with no sanity checks Key: KAFKA-2537 URL: https://issues.apache.org/jira/browse/KAFKA-2537 Project: Kafka Issue Type: Bug Components: consumer, replication, zkclient Affects Versions: 0.8.2.0 Reporter: Evan Huus Assignee: Neha Narkhede Short version: Like many other tools, mirror-maker's consumer defaults to using the localhost zookeeper instance when no specific zookeeper source is specified. It shouldn't do this. MM should also have a sanity check that the source and destination clusters are different. Long version: We run multiple clusters, all using mirrormaker to replicate to the master cluster. The kafka, zookeeper, and mirrormaker instances all run on the same nodes in the master cluster since the hardware can more than handle the load. We were doing some zookeeper maintenance on one of our remote clusters recently which accidentally caused our configuration manager (chef) to generate empty zkConnect strings for some mirrormaker instances. These instances defaulted to localhost and started mirroring from the master cluster back to itself, an infinite replication loop that caused all sorts of havok. We were able to recover gracefully and we've added additional safe-guards on our end, but mirror-maker is at least partially at fault here as well. There is no reason for it to treat an empty string as anything but an error - especially not localhost, which is typically the target cluster, not the source. Additionally, it should be trivial and very useful for mirrormaker to verify it is not consuming and producing from the same cluster; I can think of no legitimate use case for this kind of cycle. If you need any clarification or additional information, please let me know. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14574985#comment-14574985 ] Evan Huus commented on KAFKA-2147: -- We have rolled out a custom build with this patch to all our clusters and have some longer-term statistics. This 100% fixes the issue, thanks [~junrao]! You can merge this into trunk or mark this as fixed or whatever it is you need to do. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jun Rao Attachments: KAFKA-2147.patch, KAFKA-2147_2015-05-14_09:41:56.patch, KAFKA-2147_2015-05-15_16:14:44.patch, craig-kafka-purgatory-queue-size-issue.png, purgatory.log, purgatory.log.gz, watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics
[jira] [Updated] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2147: - Attachment: purgatory.log Purgatory log from the attached patch. Based on my understanding, the check is working (it is seeing the right number of elements), it just isn't waking up enough. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jun Rao Attachments: KAFKA-2147.patch, KAFKA-2147_2015-05-14_09:41:56.patch, purgatory.log, watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to have solved the problem completely. h4. Hypothesis Current versions (0.8.2.1 and earlier) have issues with the replica fetcher not backing off correctly
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14543666#comment-14543666 ] Evan Huus commented on KAFKA-2147: -- Increasing the producer batch size has substantially alleviated this issue. We still see spikes up to ~100K messages in the fetch purgatory, but these are not large enough to cause leadership changes. We are still working on trying out the patch, we have had some JVM build issues. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jun Rao Attachments: KAFKA-2147.patch, watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to have solved the problem completely. h4. Hypothesis Current versions
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14543761#comment-14543761 ] Evan Huus commented on KAFKA-2147: -- OK, it appears the patch does not fix this issue, we are still seeing the behaviour of ~100K message spikes even with it applied. Willem pointed out to me that it would be very unlikely for an unsynchronized read to return the same stale data for 10-15 minutes in a row. Even if there is a race condition, any cache should flush itself much faster than that. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jun Rao Attachments: KAFKA-2147.patch, watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540218#comment-14540218 ] Evan Huus commented on KAFKA-2147: -- 1. 4-8k messages/s snappy-compressed, taking 2.5-5 MB/s bandwidth. Now that we have several days of data, I can say that the problem appears much less severe (thought still present) during off-peak times. The average purgatory size definitely rises and falls together with the producer throughput. It is probably worth noting that the majority of our producer's traffic (slightly over 90%) is on just two topics, while the remaining topics tend to see *much* lower volume (~10 messages/minute in some cases). 2. Yes. As I wrote above: Correlates pretty strongly with the time since the previous purge. When the purgatory was purged frequently, the purges were small. The first purge after each ~30-second gap was much larger. NumDelayedRequests hovers between 200-250 regardless of the purgatory size. 3. We do not have GC log enabled, however: we have tried both the default and G1 collector with no difference. We are currently running G1, and the stats it exposes over JMX look sane. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into
[jira] [Comment Edited] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540669#comment-14540669 ] Evan Huus edited comment on KAFKA-2147 at 5/12/15 8:43 PM: --- 1. 150-200 based on a quick wireshark capture 2. Thousands, we had full debug logging enabled for this period. I'm not sure what might be relevant. At a guess, I am seeing lots of the following three lines, across many different topic/partitions: {noformat} [2015-04-30 15:54:13,999] DEBUG [Replica Manager on Broker 1]: Request key [admin,4] unblocked 0 producer requests. (kafka.server.ReplicaManager) [2015-04-30 15:54:13,999] DEBUG [Replica Manager on Broker 1]: Recorded follower 3 position 332616 for partition [admin,4]. (kafka.server.ReplicaManager) [2015-04-30 15:54:13,999] DEBUG Partition [admin,4] on broker 1: Skipping update high watermark since Old hw 332616 [0 : 337335616] is larger than new hw 332616 [0 : 337335616] for partition [admin,4]. All leo's are 332616 [0 : 337335616] (kafka.cluster.Partition) {noformat} 3. No ZK problems I can spot. was (Author: eapache): 1. 150-200 base on a quick wireshark capture 2. Thousands, we had full debug logging enabled for this period. I'm not sure what might be relevant. At a guess, I am seeing lots of the following three lines, across many different topic/partitions: {noformat} [2015-04-30 15:54:13,999] DEBUG [Replica Manager on Broker 1]: Request key [admin,4] unblocked 0 producer requests. (kafka.server.ReplicaManager) [2015-04-30 15:54:13,999] DEBUG [Replica Manager on Broker 1]: Recorded follower 3 position 332616 for partition [admin,4]. (kafka.server.ReplicaManager) [2015-04-30 15:54:13,999] DEBUG Partition [admin,4] on broker 1: Skipping update high watermark since Old hw 332616 [0 : 337335616] is larger than new hw 332616 [0 : 337335616] for partition [admin,4]. All leo's are 332616 [0 : 337335616] (kafka.cluster.Partition) {noformat} 3. No ZK problems I can spot. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540669#comment-14540669 ] Evan Huus commented on KAFKA-2147: -- 1. 150-200 base on a quick wireshark capture 2. Thousands, we had full debug logging enabled for this period. I'm not sure what might be relevant. At a guess, I am seeing lots of the following three lines, across many different topic/partitions: {noformat} [2015-04-30 15:54:13,999] DEBUG [Replica Manager on Broker 1]: Request key [admin,4] unblocked 0 producer requests. (kafka.server.ReplicaManager) [2015-04-30 15:54:13,999] DEBUG [Replica Manager on Broker 1]: Recorded follower 3 position 332616 for partition [admin,4]. (kafka.server.ReplicaManager) [2015-04-30 15:54:13,999] DEBUG Partition [admin,4] on broker 1: Skipping update high watermark since Old hw 332616 [0 : 337335616] is larger than new hw 332616 [0 : 337335616] for partition [admin,4]. All leo's are 332616 [0 : 337335616] (kafka.cluster.Partition) {noformat} 3. No ZK problems I can spot. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory
[jira] [Updated] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2147: - Attachment: watch-lists.log watch list purging logs Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Attachments: watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to have solved the problem completely. h4. Hypothesis Current versions (0.8.2.1 and earlier) have issues with the replica fetcher not backing off correctly (KAFKA-1461, KAFKA-2082 and others). I believe that in a very specific situation, the replica fetcher thread of one broker can spam another broker with requests that fill up its purgatory and do not get properly
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14521670#comment-14521670 ] Evan Huus commented on KAFKA-2147: -- When I enable TRACE request logging on a single node (broker ID 1 in the problematic cluster) I see: - approximately 850 FetchRequest messages per second with ClientId of ReplicaFetcherThread-0-1 - approximately evenly distributed between the various ReplicaIds - each one with MaxWait: 500 ms; MinBytes: 1 bytes which seems reasonable - an average of 31 partitions per request without too much variation Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to have
[jira] [Comment Edited] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14521670#comment-14521670 ] Evan Huus edited comment on KAFKA-2147 at 4/30/15 4:13 PM: --- When I enable TRACE request logging on a single node (broker ID 1 in the problematic cluster) I see: - approximately 850 FetchRequest messages per second with ClientId of ReplicaFetcherThread-0-1 - approximately evenly distributed between the various ReplicaIds - each one with MaxWait: 500 ms; MinBytes: 1 bytes which seems reasonable - an average of 31 partitions per request without too much variation When I enable DEBUG logging generally the problem seems to go away on that node... however I still collected stats on the Begin purging watch lists message. Over a five minute window: - exactly 100 occurrences of that line - heavily grouped (75 of them within a single 10-second window, otherwise a handful every ~30 seconds) - heavily variable number of elements purged; low of 38, high of 210073. Correlates pretty strongly with the time since the previous purge fetch.purgatory.purge.interval.requests is still set to 200 on this node. was (Author: eapache): When I enable TRACE request logging on a single node (broker ID 1 in the problematic cluster) I see: - approximately 850 FetchRequest messages per second with ClientId of ReplicaFetcherThread-0-1 - approximately evenly distributed between the various ReplicaIds - each one with MaxWait: 500 ms; MinBytes: 1 bytes which seems reasonable - an average of 31 partitions per request without too much variation When I enable DEBUG logging generally the problem seems to go away on that node... however I still collected stats on the Begin purging watch lists message. Over a five minute window: - exactly 100 occurrences of that line - heavily grouped (75 of them within a single 10-second window, otherwise a handful every ~30 seconds) - heavily variable number of elements purged; low of 38, high of 210073. Correlates pretty strongly with the time since the previous purge Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14514225#comment-14514225 ] Evan Huus commented on KAFKA-2147: -- Ah, OK. We are running with entirely default replication settings, so just one thread with a max wait of 500ms etc. In hindsight, that also ruins my hypothesis, since the brokers are replicating all of their topics in a single thread. We have actually just seen this issue in a third location, so I am seeing what I can learn from this new case. If there's anything you'd like us to try (config changes, etc) then let us know. Thanks, Evan Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14514302#comment-14514302 ] Evan Huus commented on KAFKA-2147: -- h5. Case 3 - Second Cluster As I mentioned above, we've started seeing this again in another situation. We set up our second cluster again (still 6 nodes, same configuration), but this time instead of having MM replicating to it from the primary, we've just pointed a bunch of our producers straight to it instead. It's gone through a couple of different states already and we're still trying to figure out why: - Initially (before anything was really producing to it) it was basically quiet, no problems. - When we turned on our producers (sending 4-6k messages/second to the cluster) the described purgatory issue immediately began on all six brokers. Broker CPU hovered around 5% and broker heap usage spiked between 1 and 3GB in proportion to purgatory size. - Approximately 10 hours later, the problem disappeared on one broker at a time, over the course of about 2 hours. As each broker healed, its heap usage stabilized and its CPU usage jumped substantially to 15%. We did not make any changes to the cluster at this time, and have no idea what caused this change. - Approximately 10 hours after *that*, (22 hours since turning on the producers) we started seeing the issue again on broker 1. Its CPU usage drops back to 5% and memory begins spiking again. We start seeing occasional spikes to 2M purgatory on other brokers, but they are generally stable while broker 1 is consistently spiking. - This morning (another 8 hours later, 30 hours after producers) we delete approximately 700 partitions across a dozen topics which had been unnecessarily provisioned to this cluster (they are only used on the primary, so were empty and receiving no traffic here). The problem immediately starts up again across all six brokers. CPU and memory correlate as before. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions,
[jira] [Comment Edited] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14514302#comment-14514302 ] Evan Huus edited comment on KAFKA-2147 at 4/27/15 3:38 PM: --- h5. Case 3 - Second Cluster As I mentioned above, we've started seeing this again in another situation. We set up our second cluster again (still 6 nodes, same configuration), but this time instead of having MM replicating to it from the primary, we've just pointed a bunch of our producers straight to it instead. It's gone through a couple of different states already and we're still trying to figure out why: - Initially (before anything was really producing to it) it was basically quiet, no problems. - When we turned on our producers (sending 4-6k messages/second to the cluster) the described purgatory issue immediately began on all six brokers. Broker CPU hovered around 5% and broker heap usage spiked between 1 and 3GB in proportion to purgatory size. - Approximately 10 hours later, the problem disappeared on one broker at a time, over the course of about 2 hours. As each broker healed, its heap usage stabilized and its CPU usage jumped substantially to 15%. We did not make any changes to the cluster at this time, and have no idea what caused this change. - Approximately 10 hours after *that*, (22 hours since turning on the producers) we started seeing the issue again on broker 1. Its CPU usage drops back to 5% and memory begins spiking again. We start seeing occasional spikes to 2M purgatory on other brokers, but they are generally stable while broker 1 was consistently spiking. Again, we have no idea what caused this change. - This morning (another 8 hours later, 30 hours after producers) we delete approximately 700 partitions across a dozen topics which had been unnecessarily provisioned to this cluster (they are only used on the primary, so were empty and receiving no traffic here). The problem immediately starts up again across all six brokers. CPU and memory correlate as before. This is our current state; as of about an hour ago, all six brokers in the second cluster are displaying the problem. CPU usage definitely correlates with the problem (5% when purgatory is spiking, 15% when it is quiet). As this is a newly set up cluster, the partitions are very evenly balanced. There may still be a handful of empty, dead topics on the cluster, but we removed the majority. I have no coherent theories left. was (Author: eapache): h5. Case 3 - Second Cluster As I mentioned above, we've started seeing this again in another situation. We set up our second cluster again (still 6 nodes, same configuration), but this time instead of having MM replicating to it from the primary, we've just pointed a bunch of our producers straight to it instead. It's gone through a couple of different states already and we're still trying to figure out why: - Initially (before anything was really producing to it) it was basically quiet, no problems. - When we turned on our producers (sending 4-6k messages/second to the cluster) the described purgatory issue immediately began on all six brokers. Broker CPU hovered around 5% and broker heap usage spiked between 1 and 3GB in proportion to purgatory size. - Approximately 10 hours later, the problem disappeared on one broker at a time, over the course of about 2 hours. As each broker healed, its heap usage stabilized and its CPU usage jumped substantially to 15%. We did not make any changes to the cluster at this time, and have no idea what caused this change. - Approximately 10 hours after *that*, (22 hours since turning on the producers) we started seeing the issue again on broker 1. Its CPU usage drops back to 5% and memory begins spiking again. We start seeing occasional spikes to 2M purgatory on other brokers, but they are generally stable while broker 1 was consistently spiking. Again, we have no idea what caused this change. - This morning (another 8 hours later, 30 hours after producers) we delete approximately 700 partitions across a dozen topics which had been unnecessarily provisioned to this cluster (they are only used on the primary, so were empty and receiving no traffic here). The problem immediately starts up again across all six brokers. CPU and memory correlate as before. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still
[jira] [Comment Edited] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14514302#comment-14514302 ] Evan Huus edited comment on KAFKA-2147 at 4/27/15 3:35 PM: --- h5. Case 3 - Second Cluster As I mentioned above, we've started seeing this again in another situation. We set up our second cluster again (still 6 nodes, same configuration), but this time instead of having MM replicating to it from the primary, we've just pointed a bunch of our producers straight to it instead. It's gone through a couple of different states already and we're still trying to figure out why: - Initially (before anything was really producing to it) it was basically quiet, no problems. - When we turned on our producers (sending 4-6k messages/second to the cluster) the described purgatory issue immediately began on all six brokers. Broker CPU hovered around 5% and broker heap usage spiked between 1 and 3GB in proportion to purgatory size. - Approximately 10 hours later, the problem disappeared on one broker at a time, over the course of about 2 hours. As each broker healed, its heap usage stabilized and its CPU usage jumped substantially to 15%. We did not make any changes to the cluster at this time, and have no idea what caused this change. - Approximately 10 hours after *that*, (22 hours since turning on the producers) we started seeing the issue again on broker 1. Its CPU usage drops back to 5% and memory begins spiking again. We start seeing occasional spikes to 2M purgatory on other brokers, but they are generally stable while broker 1 was consistently spiking. Again, we have no idea what caused this change. - This morning (another 8 hours later, 30 hours after producers) we delete approximately 700 partitions across a dozen topics which had been unnecessarily provisioned to this cluster (they are only used on the primary, so were empty and receiving no traffic here). The problem immediately starts up again across all six brokers. CPU and memory correlate as before. was (Author: eapache): h5. Case 3 - Second Cluster As I mentioned above, we've started seeing this again in another situation. We set up our second cluster again (still 6 nodes, same configuration), but this time instead of having MM replicating to it from the primary, we've just pointed a bunch of our producers straight to it instead. It's gone through a couple of different states already and we're still trying to figure out why: - Initially (before anything was really producing to it) it was basically quiet, no problems. - When we turned on our producers (sending 4-6k messages/second to the cluster) the described purgatory issue immediately began on all six brokers. Broker CPU hovered around 5% and broker heap usage spiked between 1 and 3GB in proportion to purgatory size. - Approximately 10 hours later, the problem disappeared on one broker at a time, over the course of about 2 hours. As each broker healed, its heap usage stabilized and its CPU usage jumped substantially to 15%. We did not make any changes to the cluster at this time, and have no idea what caused this change. - Approximately 10 hours after *that*, (22 hours since turning on the producers) we started seeing the issue again on broker 1. Its CPU usage drops back to 5% and memory begins spiking again. We start seeing occasional spikes to 2M purgatory on other brokers, but they are generally stable while broker 1 is consistently spiking. - This morning (another 8 hours later, 30 hours after producers) we delete approximately 700 partitions across a dozen topics which had been unnecessarily provisioned to this cluster (they are only used on the primary, so were empty and receiving no traffic here). The problem immediately starts up again across all six brokers. CPU and memory correlate as before. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap
[jira] [Created] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
Evan Huus created KAFKA-2147: Summary: Unbalanced replication can cause extreme purgatory growth Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Joel Koshy Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to have solved the problem completely. h4. Hypothesis Current versions (0.8.2.1 and earlier) have issues with the replica fetcher not backing off correctly (KAFKA-1461, KAFKA-2082 and others). I believe that in a very specific situation, the replica fetcher thread of one broker can spam another broker with requests that fill up its purgatory and do not get properly flushed. My best guess is that the necessary conditions are: - broker A leads some partitions which receive regular traffic, and some partitions which do not - broker B replicates some of each type of partition from broker A - some producers are producing with RequiredAcks=-1 (wait for all ISR) - broker B
[jira] [Created] (KAFKA-2143) Replicas get ahead of leader and fail
Evan Huus created KAFKA-2143: Summary: Replicas get ahead of leader and fail Key: KAFKA-2143 URL: https://issues.apache.org/jira/browse/KAFKA-2143 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Neha Narkhede On a cluster of 6 nodes, we recently saw a case where a single under-replicated partition suddenly appeared, replication lag spiked, and network IO spiked. The cluster appeared to recover eventually on its own, Looking at the logs, the thing which failed was partition 7 of the topic {{background_queue}}. It had an ISR of 1,4,3 and its leader at the time was 3. Here are the interesting log lines: On node 3 (the leader): {noformat} [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 148185816. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 156007054. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:13,960] INFO Partition [background_queue,7] on broker 3: Shrinking ISR for partition [background_queue,7] from 1,4,3 to 3 (kafka.cluster.Partition) {noformat} And on nodes 1 and 4 (the replicas) many occurrences of the following: {noformat} [2015-04-23 16:50:05,935] INFO Scheduling log segment 3648049863 for log background_queue-7 for deletion. (kafka.log.Log) (edited) {noformat} Based on my reading, this looks like the replicas somehow got *ahead* of the leader, asked for an invalid offset, got confused, and re-replicated the entire topic from scratch to recover (this matches our network graphs, which show 3 sending a bunch of data to 1 and 4). Taking a stab in the dark at the cause, there appears to be a race condition where replicas can receive a new offset before the leader has committed it and is ready to replicate? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2143) Replicas get ahead of leader and fail
[ https://issues.apache.org/jira/browse/KAFKA-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2143: - Description: On a cluster of 6 nodes, we recently saw a case where a single under-replicated partition suddenly appeared, replication lag spiked, and network IO spiked. The cluster appeared to recover eventually on its own, Looking at the logs, the thing which failed was partition 7 of the topic {{background_queue}}. It had an ISR of 1,4,3 and its leader at the time was 3. Here are the interesting log lines: On node 3 (the leader): {noformat} [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 148185816. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 156007054. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:13,960] INFO Partition [background_queue,7] on broker 3: Shrinking ISR for partition [background_queue,7] from 1,4,3 to 3 (kafka.cluster.Partition) {noformat} Note that both replicas suddenly asked for an offset *ahead* of the available offsets. And on nodes 1 and 4 (the replicas) many occurrences of the following: {noformat} [2015-04-23 16:50:05,935] INFO Scheduling log segment 3648049863 for log background_queue-7 for deletion. (kafka.log.Log) (edited) {noformat} Based on my reading, this looks like the replicas somehow got *ahead* of the leader, asked for an invalid offset, got confused, and re-replicated the entire topic from scratch to recover (this matches our network graphs, which show 3 sending a bunch of data to 1 and 4). Taking a stab in the dark at the cause, there appears to be a race condition where replicas can receive a new offset before the leader has committed it and is ready to replicate? was: On a cluster of 6 nodes, we recently saw a case where a single under-replicated partition suddenly appeared, replication lag spiked, and network IO spiked. The cluster appeared to recover eventually on its own, Looking at the logs, the thing which failed was partition 7 of the topic {{background_queue}}. It had an ISR of 1,4,3 and its leader at the time was 3. Here are the interesting log lines: On node 3 (the leader): {noformat} [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 148185816. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 156007054. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:13,960] INFO Partition [background_queue,7] on broker 3: Shrinking ISR for partition [background_queue,7] from 1,4,3 to 3 (kafka.cluster.Partition) {noformat} And on nodes 1 and 4 (the replicas) many occurrences of the following: {noformat} [2015-04-23 16:50:05,935] INFO Scheduling log segment 3648049863 for log background_queue-7 for deletion. (kafka.log.Log) (edited) {noformat} Based on my reading, this looks like the replicas somehow got *ahead* of the leader, asked for an invalid offset, got confused, and re-replicated the entire topic from scratch to recover (this matches our network graphs, which show 3 sending a bunch of data to 1 and 4). Taking a stab in the dark at the cause, there appears to be a race condition where replicas can receive a new offset before the leader has committed it and is ready to replicate? Replicas get ahead of leader and fail - Key: KAFKA-2143 URL: https://issues.apache.org/jira/browse/KAFKA-2143 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Neha Narkhede On a cluster of 6 nodes, we recently saw a case where a single under-replicated partition suddenly appeared, replication lag spiked, and network IO spiked. The cluster appeared to recover eventually on its own, Looking at the logs, the thing which failed was partition 7 of the topic {{background_queue}}. It had an ISR of
[jira] [Resolved] (KAFKA-1033) Metadata requests do not always return the complete list of available brokers
[ https://issues.apache.org/jira/browse/KAFKA-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus resolved KAFKA-1033. -- Resolution: Not A Problem Metadata requests do not always return the complete list of available brokers - Key: KAFKA-1033 URL: https://issues.apache.org/jira/browse/KAFKA-1033 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Evan Huus I discovered this while writing a Go client (https://github.com/Shopify/sarama) and it is making one of the issues I'm having rather difficult to solve (https://github.com/Shopify/sarama/issues/15). In summary: sending a metadata request with an empty list of topics is supposed to return a list of *all* metadata in the cluster. However, the list of brokers is incomplete. I have not been able to pin down precisely which brokers are missing, but I believe it happens when a broker is not currently the leader for any partition of any topic. Among other things this can make it very difficult to provide failover in a small cluster of only one master and one replica server - clients requesting metadata sometimes are not told of the replica broker and cannot fail-over to it when the master goes down. If it is intentional to only return a subset of brokers (whatever that subset is), please document somewhere what that subset is, and how clients should learn of brokers outside that subset. Thanks, Evan -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14483482#comment-14483482 ] Evan Huus edited comment on KAFKA-2082 at 4/7/15 4:49 PM: -- OK, yes, if I wait long enough (2-3 minutes) then it eventually heals itself. That seems slow (should it not refetch metadata as soon as the zk connection recovers?) but it does appear to work. Edit: I guess if all brokers fetched metadata on a zk recovery then a real zk outage could cause thundering herd, but adding a random 20 second backoff should alleviate that and still heal the cluster much faster than is happening here. was (Author: eapache): OK, yes, if I wait long enough (2-3 minutes) then it eventually heals itself. That seems slow (should it not refetch metadata as soon as the zk connection recovers?) but it does appear to work. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14483482#comment-14483482 ] Evan Huus commented on KAFKA-2082: -- OK, yes, if I wait long enough (2-3 minutes) then it eventually heals itself. That seems slow (should it not refetch metadata as soon as the zk connection recovers?) but it does appear to work. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back into a sane state. -- This message was sent by Atlassian
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14483421#comment-14483421 ] Evan Huus commented on KAFKA-2082: -- At what point will it recover? I've let it run for a few minutes before killing it. Is it something that polls e.g. every ten minutes for updated metadata that will finally kick it? Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back into a sane state. -- This message was sent by Atlassian JIRA
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482058#comment-14482058 ] Evan Huus commented on KAFKA-2082: -- What if (in a normally set up cluster) a broker becomes completely isolated from all zk nodes and all other brokers? If I understand correctly, effectively the same bug will occur, as the isolated broker will serve stale metadata. So I think that regardless of how the test is set up, a broker which does not have a zookeeper connection should refuse to serve metadata requests. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481981#comment-14481981 ] Evan Huus commented on KAFKA-2082: -- So I guess the producer should not be connecting to broker that isn't the controller? I don't know how a non-jvm client is supposed to detect that case as it doesn't appear to be exposed in the protocol. Or should a broker which is completely disconnected from zookeeper refuse to serve metadata requests? Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482301#comment-14482301 ] Evan Huus commented on KAFKA-2082: -- OK, that all makes sense. Some follow-up questions: 1. When the cluster heals (at the end of the test) the replication continues to fail. Based on your explanation, I would have thought that the cluster healing would have triggered oncontrollerresignation, which would update the metadata on that node and fix the replication by pointing it to the new leader, but that does not appear to be the case? 2. The go client currently tries brokers in the order specified by the user on startup, not randomly. I can see the argument for a random choice (it's on our todo list) but even that only works around the problem - an unlucky producer could choose the bad broker enough times in a row to run out of retries and start dropping messages. I really think that an isolated broker should refuse to serve metadata requests - it knows that its information is likely stale, and forcing clients to try another broker is the only way for them to reliably get fresh metadata. Just like documentation: the only metadata worse than no metadata is incorrect metadata. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482302#comment-14482302 ] Evan Huus commented on KAFKA-2082: -- OK, that all makes sense. Some follow-up questions: 1. When the cluster heals (at the end of the test) the replication continues to fail. Based on your explanation, I would have thought that the cluster healing would have triggered oncontrollerresignation, which would update the metadata on that node and fix the replication by pointing it to the new leader, but that does not appear to be the case? 2. The go client currently tries brokers in the order specified by the user on startup, not randomly. I can see the argument for a random choice (it's on our todo list) but even that only works around the problem - an unlucky producer could choose the bad broker enough times in a row to run out of retries and start dropping messages. I really think that an isolated broker should refuse to serve metadata requests - it knows that its information is likely stale, and forcing clients to try another broker is the only way for them to reliably get fresh metadata. Just like documentation: the only metadata worse than no metadata is incorrect metadata. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482421#comment-14482421 ] Evan Huus commented on KAFKA-2082: -- 1. Then why does broker3 continue to spam logs? (I understand the *spam* is due to the lack of backoff, but if it gets the latest metadata at that point why does it continue to fail at all?) Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back into a sane state. -- This message was sent by Atlassian JIRA
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14396024#comment-14396024 ] Evan Huus commented on KAFKA-2082: -- If the root of this bug is the controller not returning the correct LeaderAndIsr, then that is possibly the same bug as the continued failure of my TestReliableProducing, which (per my hypothesis above) is probably the broker returning a stale or incorrect leader in a MetadataResponse. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14395847#comment-14395847 ] Evan Huus commented on KAFKA-2082: -- Hmm, the test is configured to retry a message 5 times (including refetching metadata) and to wait 2 seconds in between retries. The line # of that error means that a message ran out of retries and was abandoned (the printed message was just the last error reported against that particular message). I suspect this is a case of a broker returning bad/stale metadata, so we try again on the same broker even though leadership has moved. Logs like Fetching metadata for [TOPIC(s)] from broker ADDRESS will tell you which broker we are asking for metadata. Logs like producer/leader selected broker ID on TOPIC/PARTITION will tell you (after fetching metadata) which broker the metadata claimed was leader. Logs like producer/flusher/BROKER state change to [retrying] on TOPIC/PARTITION because ERROR, indicate that the produce response contained the given error, so we are retrying some messages. If we keep asking the same broker for metadata, selecting the same leader, and then getting the same error back and repeating the cycle, then the broker we are fetching metadata from is giving us back bad data, and I suppose that's another bug. Of course it's possible that we just need to increase the number of retries to give the cluster more time to heal. I'll try myself with the patch on monday. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} -
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14392017#comment-14392017 ] Evan Huus commented on KAFKA-2082: -- I have pushed another branch called {{test-jira-kafka-2082}} which is identical to the previous branch except it also sets the virtualbox memory to 3GB (otherwise it defaults to 512MB). You will likely have to {{vagrant destroy}} and recreate for this setting to stick. With this branch, I have been able to reproduce from scratch using VMware on an OSX host and VirtualBox on an Ubuntu host - the process is identical. The relevant test fragment when it fails is as follows: {noformat} === RUN TestReliableProducing [sarama] 2015/04/01 22:26:06 Initializing new client [sarama] 2015/04/01 22:26:06 Fetching metadata for all topics from broker 192.168.100.67:9091 [sarama] 2015/04/01 22:26:06 Connected to broker 192.168.100.67:9091 [sarama] 2015/04/01 22:26:06 Registered new broker #9091 at 192.168.100.67:9091 [sarama] 2015/04/01 22:26:06 Registered new broker #9095 at 192.168.100.67:9095 [sarama] 2015/04/01 22:26:06 Registered new broker #9094 at 192.168.100.67:9094 [sarama] 2015/04/01 22:26:06 Registered new broker #9093 at 192.168.100.67:9093 [sarama] 2015/04/01 22:26:06 Registered new broker #9092 at 192.168.100.67:9092 [sarama] 2015/04/01 22:26:06 Successfully initialized new client [sarama] 2015/04/01 22:26:06 producer/flusher/9091 starting up [sarama] 2015/04/01 22:26:06 Connected to broker 192.168.100.67:9091 [sarama] 2015/04/01 22:26:06 producer/flusher/9092 starting up [sarama] 2015/04/01 22:26:06 Connected to broker 192.168.100.67:9092 [sarama] 2015/04/01 22:26:06 producer/flusher/9095 starting up [sarama] 2015/04/01 22:26:06 Connected to broker 192.168.100.67:9095 [sarama] 2015/04/01 22:26:06 producer/flusher/9093 starting up [sarama] 2015/04/01 22:26:06 Connected to broker 192.168.100.67:9093 [sarama] 2015/04/01 22:26:06 producer/flusher/9094 starting up [sarama] 2015/04/01 22:26:06 Connected to broker 192.168.100.67:9094 [sarama] 2015/04/01 22:26:09 zk1 disabled [sarama] 2015/04/01 22:26:12 zk3 disabled [sarama] 2015/04/01 22:26:19 producer/flusher/9091 state change to [retrying] on many_partition/25 because kafka server: Request exceeded the user-specified time limit in the request. [sarama] 2015/04/01 22:26:19 producer/flusher/9091 state change to [retrying] on many_partition/5 because kafka server: Request exceeded the user-specified time limit in the request. [sarama] 2015/04/01 22:26:19 producer/leader state change to [retrying-1] on many_partition/25 [sarama] 2015/04/01 22:26:19 producer/leader abandoning broker 9091 on many_partition/25 [sarama] 2015/04/01 22:26:19 producer/flusher/9091 state change to [retrying] on many_partition/30 because kafka server: Request exceeded the user-specified time limit in the request. [sarama] 2015/04/01 22:26:19 producer/leader state change to [retrying-1] on many_partition/5 [sarama] 2015/04/01 22:26:19 producer/leader abandoning broker 9091 on many_partition/5 [sarama] 2015/04/01 22:26:19 producer/leader state change to [retrying-1] on many_partition/30 [sarama] 2015/04/01 22:26:19 producer/leader abandoning broker 9091 on many_partition/30 [sarama] 2015/04/01 22:26:19 producer/flusher/9091 state change to [normal] on many_partition/25 [sarama] 2015/04/01 22:26:19 producer/flusher/9093 state change to [retrying] on many_partition/7 because kafka server: Request exceeded the user-specified time limit in the request. [sarama] 2015/04/01 22:26:19 producer/leader state change to [retrying-1] on many_partition/7 [sarama] 2015/04/01 22:26:19 producer/leader abandoning broker 9093 on many_partition/7 [sarama] 2015/04/01 22:26:21 Fetching metadata for [many_partition] from broker 192.168.100.67:9091 [sarama] 2015/04/01 22:26:21 Fetching metadata for [many_partition] from broker 192.168.100.67:9091 [sarama] 2015/04/01 22:26:21 Fetching metadata for [many_partition] from broker 192.168.100.67:9091 [sarama] 2015/04/01 22:26:21 producer/leader selected broker 9091 on many_partition/25 [sarama] 2015/04/01 22:26:21 producer/leader state change to [normal-1] on many_partition/25 [sarama] 2015/04/01 22:26:21 producer/leader state change to [flushing-0] on many_partition/25 [sarama] 2015/04/01 22:26:21 producer/leader state change to [normal-0] on many_partition/25 [sarama] 2015/04/01 22:26:21 producer/leader selected broker 9091 on many_partition/5 [sarama] 2015/04/01 22:26:21 producer/leader selected broker 9091 on many_partition/30 [sarama] 2015/04/01 22:26:21 Fetching metadata for [many_partition] from broker 192.168.100.67:9091 [sarama] 2015/04/01 22:26:21 producer/leader selected broker 9093 on many_partition/7 [sarama] 2015/04/01 22:26:24 producer/flusher/9091 state change to [retrying] on many_partition/20 because kafka server: Request exceeded the user-specified time limit in the request.
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14391877#comment-14391877 ] Evan Huus commented on KAFKA-2082: -- The 3GB is only set if you use VMware fusion, if you use virtualbox or something else you may need to increase the memory manually. The test {{TestReliableProducing}} is failing consistently for me. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Neha Narkhede Priority: Critical While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout toxiproxy-final}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back into a sane state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2082: - Description: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back into a sane state. was: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not
[jira] [Updated] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2082: - Description: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout toxiproxy-final}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka/9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain cases (the {{TEST_SEED}} in the instructions is important) somehow kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. was: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN
[jira] [Updated] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2082: - Description: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout toxiproxy-final}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. was: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01
[jira] [Updated] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2082: - Description: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout toxiproxy-final}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain cases (the {{TEST_SEED}} in the instructions is important) somehow kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. was: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN
[jira] [Updated] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-2082: - Description: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout toxiproxy-final}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back into a sane state. was: While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for
[jira] [Created] (KAFKA-1998) Partitions Missing From MetadataResponse
Evan Huus created KAFKA-1998: Summary: Partitions Missing From MetadataResponse Key: KAFKA-1998 URL: https://issues.apache.org/jira/browse/KAFKA-1998 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.0 Reporter: Evan Huus It is known behaviour that when a partition is entirely offline (it has no leader because all of its replicas are down) then that partition will not be included in the metadata returned by other brokers. For example, if topic foo has 3 partitions, but all replicas of partition 3 are offline, then requesting metadata for foo will only return information about partitions 1 and 2. This means that there is no way to reliably determine the number of partitions for a topic via kafka's metadata API; if I receive information on partitions 1 and 2, I don't know if partition 3 is offline or if it is simply that there are only two partitions total. (You can presumably still ask zookeeper directly, but that is a work-around). This ambiguity, in turn, can lead to a consistency problem with the default partitioner, since that effectively implements `hash(key) mod #partitions`. If a partition goes offline and is removed from the metadata response, then the number of partitions the producer knows about will change (on its next metadata refresh) and the mapping from keys to partitions will also change. Instead of distributing messages among (for example) 3 partitions, and failing to produce to the offline partition, it will distribute *all* messages among the two online partitions. This results in messages being sent to the wrong partition. Since kafka already returns partitions with error messages in many cases (e.g. `LeaderNotAvailable`) I think it makes much more sense and fixes the above partition problem if it would simply return offline partitions as well with the appropriate error (whether that is `LeaderNotAvailable` or it would be better to add an additional error is up to you). CC [~guozhang] (This issue was originally described/discussed on the kafka-users mailing list, in the thread involving https://mail-archives.apache.org/mod_mbox/kafka-users/201503.mbox/%3CCAA4pprAZvp2XhdNmy0%2BqVZ1UVdVxmUfz3DDArhGbwP-iiH%2BGyg%40mail.gmail.com%3E) If there are any questions I am happy to clarify, I realize the scenario is somewhat complex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311387#comment-14311387 ] Evan Huus commented on KAFKA-1718: -- [~guozhang], [~jkreps] my understanding is that while this is a known limitation of the current design (and the wiki now reflects that limitation), this ticket is still open to track support for multiple compressed message-sets in a single produce request. The points I made in my comment on Oct 21st still stand. I'm not sure if there's been any progress in the actual implementation of that support. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer
[ https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248764#comment-14248764 ] Evan Huus commented on KAFKA-1806: -- Sarama client maintainer here (via https://github.com/Shopify/sarama/issues/226); this looks like a kafka bug to me since the error in the log message is from a ReplicaFetcherThread, but I'm happy to provide extra information on the behaviour of the client if you think it's relevant. broker can still expose uncommitted data to a consumer -- Key: KAFKA-1806 URL: https://issues.apache.org/jira/browse/KAFKA-1806 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: lokesh Birla Assignee: Neha Narkhede Although following issue: https://issues.apache.org/jira/browse/KAFKA-727 is marked fixed but I still see this issue in 0.8.1.1. I am able to reproducer the issue consistently. [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch request for partition [mmetopic4,2] offset 1940029 from consumer with correlation id 21 (kafka.server.Kaf kaApis) java.lang.IllegalArgumentException: Attempt to read with a maximum offset (1818353) less than the start offset (1940029). at kafka.log.LogSegment.read(LogSegment.scala:136) at kafka.log.Log.read(Log.scala:386) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.Map$Map1.foreach(Map.scala:119) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.Map$Map1.map(Map.scala:107) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201374#comment-14201374 ] Evan Huus commented on KAFKA-1744: -- [~nehanarkhede] this was discovered in the golang consumer I maintain - the scala consumer (as I linked) seems to handle this case already. I have not checked the java consumer. The [spec for the fetch API|https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI] implies (though it does not explicitly state) that if I perform a fetch request for offset X, the fetch response will contain messages whose offset is strictly = X. If this is not true (in practice I have seen messages with offsets X) I would suggest explicitly noting this in the spec to avoid confusion. Alternatively it may be a real bug in the broker, in which case the spec is fine and the broker should be fixed. I don't have enough information to say for sure. Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201425#comment-14201425 ] Evan Huus commented on KAFKA-1744: -- [~junrao] very nice stealth edit of the spec, thank you :) That clarification is what I was looking for, this ticket can be closed. Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201425#comment-14201425 ] Evan Huus edited comment on KAFKA-1744 at 11/7/14 2:00 AM: --- [~junrao] thanks for the clarification, that's what I was looking for, this ticket can be closed. was (Author: eapache): [~junrao] very nice stealth edit of the spec, thank you :) That clarification is what I was looking for, this ticket can be closed. Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14181348#comment-14181348 ] Evan Huus commented on KAFKA-1718: -- [~junrao] I already have wiki permissions, so I made the relevant change. While I'm in the neighbourhood, what is the expected value of the {{MagicByte}} field? The spec doesn't clarify, and my library has been leaving it at 0 without problems thus far, but [~sriharsha] mentioned earlier that the value should be 2? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14180126#comment-14180126 ] Evan Huus edited comment on KAFKA-1718 at 10/22/14 4:22 PM: ??when it gets a partial message from the broker it will dynamically increase its fetch size?? like https://github.com/Shopify/sarama/blob/master/consumer.go#L236-L253 ? was (Author: eapache): ??when it gets a partial message from the broker it will dynamically increase its fetch size?? like https://github.com/Shopify/sarama/blob/master/consumer.go#L236-L253? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14180126#comment-14180126 ] Evan Huus commented on KAFKA-1718: -- ??when it gets a partial message from the broker it will dynamically increase its fetch size?? like https://github.com/Shopify/sarama/blob/master/consumer.go#L236-L253? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179485#comment-14179485 ] Evan Huus commented on KAFKA-1718: -- ??I guess your go producer can send multiple compressed messages for a single partition?? Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed {{message.max.bytes}} when compressed together, it batches them and sends each batch as a compressed message in the same messageSet. ??Is there any benefit in doing that??? More-or-less to get around the limit on message sizes, which I guess doesn't work so well :) A few points on this then: * Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me. * I understand the desire to limit real message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed meta-messages; why shouldn't they be arbitrarily large, within the limits of {{socket.request.max.bytes}}? * I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once. Thanks for all your help! Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179485#comment-14179485 ] Evan Huus edited comment on KAFKA-1718 at 10/22/14 2:26 AM: ??I guess your go producer can send multiple compressed messages for a single partition?? Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed {{message.max.bytes}} when compressed together, it batches them and sends each batch as a compressed message in the same messageSet. ??Is there any benefit in doing that??? More-or-less to get around the limit on message sizes, which I guess doesn't work so well :) A few points on this then: * Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me. * I understand the desire to limit real message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed meta-messages; why shouldn't they be arbitrarily large, within the limits of {{socket.request.max.bytes}}? * I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once. Thanks for all your help! Edit: If for some reason you decide to keep the current behaviour as-is, please document this in the protocol spec on the wiki, since as far as I can the spec gives no reason to believe that multiple compressed messages will be combined, and that the _combined_ length will be relevant. was (Author: eapache): ??I guess your go producer can send multiple compressed messages for a single partition?? Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed {{message.max.bytes}} when compressed together, it batches them and sends each batch as a compressed message in the same messageSet. ??Is there any benefit in doing that??? More-or-less to get around the limit on message sizes, which I guess doesn't work so well :) A few points on this then: * Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me. * I understand the desire to limit real message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed meta-messages; why shouldn't they be arbitrarily large, within the limits of {{socket.request.max.bytes}}? * I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once. Thanks for all your help! Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which
[jira] [Created] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
Evan Huus created KAFKA-1718: Summary: Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177112#comment-14177112 ] Evan Huus commented on KAFKA-1718: -- LogOverhead is only 12 bytes; none of the values I produce are within 12 bytes of the limit and nowhere near the 1070127 that the broker is reporting. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177351#comment-14177351 ] Evan Huus edited comment on KAFKA-1718 at 10/20/14 7:28 PM: That additional data is only 26 bytes, and is already included in the numbers I put in my original report. was (Author: eapache): The numbers I put in my original report do take all of that additional data into consideration already. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177487#comment-14177487 ] Evan Huus commented on KAFKA-1718: -- That sounds plausible. 1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something? 2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177487#comment-14177487 ] Evan Huus edited comment on KAFKA-1718 at 10/20/14 9:23 PM: That sounds plausible. 1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something? 2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)? Edit: actually, that can't be it. From my original report When uncompressed, each message contains a message set of 999600 bytes. So unless the recompression on the broker's end *added* a substantial amount of data (which is improbable; the messages were all 0s)... was (Author: eapache): That sounds plausible. 1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something? 2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13818523#comment-13818523 ] Evan Huus commented on KAFKA-1110: -- My ICLA has been accepted, and my name appears on the list at https://people.apache.org/committer-index.html#unlistedclas Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13817414#comment-13817414 ] Evan Huus commented on KAFKA-1110: -- The 0x07 is the same as the CompressionCodeMask in the scala version, I just didn't give it a name (although I have a change pending for that now, for clarity's sake). The information I really need to proceed is for somebody to explain the output of Kafka's ConsoleProducer. It does *not* match the wire protocol to the best of my knowledge, but the broker accepts it anyways. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13817478#comment-13817478 ] Evan Huus commented on KAFKA-1110: -- I have verified that I have log4j in the path, and am getting lots of normal log messages but no errors. However, I think I've figured it out. I took a wild guess based on the apparent structure of the messages being sent by the ConsoleProducer, and tried wrapping the message in a *second* message-set, and voila it appeared to be logged correctly. Maybe I just got lucky, but if somebody can verify that compressed messages do in fact have to be wrapped twice for some reason, we're good to go. There is still a bug in that not adding the second layer doesn't produce any errors. It should log an error *and* it should return an error code in the ProduceResponse. Also, the wiki should be updated. All of this pending confirmation that my fix is correct though. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13817480#comment-13817480 ] Evan Huus commented on KAFKA-1110: -- Bitwise and is commutative, so (mask value) is always the same as (value mask). According to the spec The lowest 3 bits contain the compression codec used for the message which means the mask should be 0x07, not 0x03. Either the spec or the code should change. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13817504#comment-13817504 ] Evan Huus commented on KAFKA-1110: -- I've made changes to that page in the past but I guess the Confluence upgrade changed permissions. My wiki username is eapache. I *think* I already have an ICLA on file (given I've made changes in the past) but I'm not sure how to check, so I'll submit another one to be sure. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13817532#comment-13817532 ] Evan Huus commented on KAFKA-1110: -- ICLA submitted. Don't know how long it will take me to show up on the list, but when I do please add me to the wiki and I will make the necessary changes. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13817533#comment-13817533 ] Evan Huus commented on KAFKA-1110: -- This bug can now track the fact that a compressed message not containing a message-set results in no errors logged or returned. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13816949#comment-13816949 ] Evan Huus commented on KAFKA-1110: -- I don't know how to make it work. The provided ConsoleProducer appears, for all intents and purposes, to be sending garbage and yet it is accepted for some reason. I was hoping somebody who knows the protocol could clarify the structure, then I could match it in Go. I really don't want to have to reverse-engineer the protocol from scratch. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13815127#comment-13815127 ] Evan Huus commented on KAFKA-1110: -- Hi Joe, as verified with the network trace, Sarama is setting the compression bit correctly (it happens at https://github.com/Shopify/sarama/blob/master/message.go#L36-L37). I am perfectly happy to do the coding work as necessary to match the protocol. The problem is that based on the network traces and the wiki page I *am* matching the protocol, and it still doesn't work. Moreover, the ConsoleConsumer, to the best of my knowledge is *not* matching the protocol, and yet it works. Maybe just the protocol spec (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) needs updating? Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Comment Edited] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13815127#comment-13815127 ] Evan Huus edited comment on KAFKA-1110 at 11/6/13 6:31 PM: --- Hi Joe, as verified with the network trace already attached, Sarama is setting the compression bit correctly (it happens at https://github.com/Shopify/sarama/blob/master/message.go#L36-L37). I am perfectly happy to do the coding work as necessary to match the protocol. The problem is that based on the network traces and the wiki page I *am* matching the protocol, and it still doesn't work. Moreover, the ConsoleConsumer, to the best of my knowledge is *not* matching the protocol, and yet it works. Maybe just the protocol spec (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) needs updating? was (Author: eapache): Hi Joe, as verified with the network trace, Sarama is setting the compression bit correctly (it happens at https://github.com/Shopify/sarama/blob/master/message.go#L36-L37). I am perfectly happy to do the coding work as necessary to match the protocol. The problem is that based on the network traces and the wiki page I *am* matching the protocol, and it still doesn't work. Moreover, the ConsoleConsumer, to the best of my knowledge is *not* matching the protocol, and yet it works. Maybe just the protocol spec (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) needs updating? Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Comment Edited] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13809464#comment-13809464 ] Evan Huus edited comment on KAFKA-1110 at 11/6/13 6:32 PM: --- I am the primary author of the golang library and the one quoted in the main description. The [1] link in my quote was a reference to the Snappy framing format spec: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt The part that is confusing me the most is the 0x82 at the beginning of the message produced by the ConsoleProducer, since it does not match any possible structure I know of. Raw snappy would expect a little-endian varint of the message length (which 0x82 is not), snappy framing would expect 0xff prior to sNaPpY (as opposed to the actual SNAPPY in the message) and a Kafka string would be a two-byte length. Worth noting: recent development builds of Wireshark support decoding the Kafka protocol as described in the spec, so that may aid in reading the PCAP files for debugging. You will have to set a preference to tell it which port to decode on though. was (Author: eapache): I am the primary author of the golang library and the one quoted in the main description. The [1] link in my quote was a reference to the Snappy framing format spec: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt The part that is confusing me the most is the 0x82 at the beginning of the message produced by the ConsoleConsumer, since it does not match any possible structure I know of. Raw snappy would expect a little-endian varint of the message length (which 0x82 is not), snappy framing would expect 0xff prior to sNaPpY (as opposed to the actual SNAPPY in the message) and a Kafka string would be a two-byte length. Worth noting: recent development builds of Wireshark support decoding the Kafka protocol as described in the spec, so that may aid in reading the PCAP files for debugging. You will have to set a preference to tell it which port to decode on though. Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us
[jira] [Comment Edited] (KAFKA-1110) Unable to produce messages with snappy/gzip compression
[ https://issues.apache.org/jira/browse/KAFKA-1110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13815127#comment-13815127 ] Evan Huus edited comment on KAFKA-1110 at 11/6/13 6:32 PM: --- Hi Joe, as verified with the network trace already attached, Sarama is setting the compression bit correctly (it happens at https://github.com/Shopify/sarama/blob/master/message.go#L36-L37). I am perfectly happy to do the coding work as necessary to match the protocol. The problem is that based on the network traces and the wiki page I *am* matching the protocol, and it still doesn't work. Moreover, the ConsoleProducer, to the best of my knowledge is *not* matching the protocol, and yet it works. Maybe just the protocol spec (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) needs updating? was (Author: eapache): Hi Joe, as verified with the network trace already attached, Sarama is setting the compression bit correctly (it happens at https://github.com/Shopify/sarama/blob/master/message.go#L36-L37). I am perfectly happy to do the coding work as necessary to match the protocol. The problem is that based on the network traces and the wiki page I *am* matching the protocol, and it still doesn't work. Moreover, the ConsoleConsumer, to the best of my knowledge is *not* matching the protocol, and yet it works. Maybe just the protocol spec (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) needs updating? Unable to produce messages with snappy/gzip compression --- Key: KAFKA-1110 URL: https://issues.apache.org/jira/browse/KAFKA-1110 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Kafka version: kafka-0.8.0-beta1 OS version: Darwin 12.4.1 Darwin Kernel Version 12.4.1: Tue May 21 17:04:50 PDT 2013; root:xnu-2050.40.51~1/RELEASE_X86_64 x86_64 Reporter: Arup Malakar Attachments: kafka_producer_snappy_pkt_63.pcapng, sarama_producer_snappy_pkt_1.pcapng Sarama[1] (A golang kafka library: https://github.com/Shopify/sarama) is following the specs as defined in: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol but messages are not getting into the kafka log file and consumers never see them when gzip/snappy is used. Without compression it works fine though. Few observations we made: 1. Kafka service does have required jars to be able to interpret snappy messages. When I modify ConsoleProducer to produce messages using SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used: % bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/.log | tail -1 offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249 So I don't think it would be a case of missing jars in kafka server. 2. Kafka doesn't return any error if the message doesn't make it to the log file. This seems pretty serious, as I would expect kafka to throw an error if I am using WaitForLocal/WaitForAll. 3. We did an inspection of the tcp packet to see the difference between what ConsoleProducer sends vs what sarama sends (Following is a copy/paste from a github issue): [~eapache] : So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value: a. Sarama sends two bytes of snappy header and then msg-payload (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward. b. ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong. 1: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt Let us know if recent changes in the codebase makes the protocol page obsolete, in that case if the protocol page is updated we could update our client to use the new spec. More information could be found in the following github issue: https://github.com/Shopify/sarama/issues/32 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Created] (KAFKA-1033) Metadata requests do not always return the complete list of available brokers
Evan Huus created KAFKA-1033: Summary: Metadata requests do not always return the complete list of available brokers Key: KAFKA-1033 URL: https://issues.apache.org/jira/browse/KAFKA-1033 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Evan Huus I discovered this while writing a Go client (https://github.com/Shopify/sarama) and it is making one of the issues I'm having rather difficult to solve (https://github.com/Shopify/sarama/issues/15). In summary: sending a metadata request with an empty list of topics is supposed to return a list of *all* metadata in the cluster. However, the list of brokers is incomplete. I have not been able to pin down precisely which brokers are missing, but I believe it happens when a broker is not currently the leader for any partition of any topic. Among other things this can make it very difficult to provide failover in a small cluster of only one master and one replica server - clients requesting metadata sometimes are not told of the replica broker and cannot fail-over to it when the master goes down. If it is intentional to only return a subset of brokers (whatever that subset is), please document somewhere what that subset is, and how clients should learn of brokers outside that subset. Thanks, Evan -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-993) Offset Management API is either broken or mis-documented
[ https://issues.apache.org/jira/browse/KAFKA-993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13746033#comment-13746033 ] Evan Huus commented on KAFKA-993: - I have updated the protocol wiki to note that these APIs are not available in 0.8. Thanks for the clarification. Offset Management API is either broken or mis-documented Key: KAFKA-993 URL: https://issues.apache.org/jira/browse/KAFKA-993 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8, 0.8.1 Reporter: Evan Huus Assignee: Jun Rao I am in the process of building a set of Go client bindings for the new 0.8 protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). Everything works but the Offset Commit/Fetch APIs. Fetch never returns any data, and trying to Commit results in the broker forcibly disconnecting my client. I have double-checked the bytes on the wire using Wireshark, and my client is obeying the protocol spec. After some digging, I found KAFKA-852 which seems related, but I have tried my client against the 0.8 beta, 0.8 branch, and even trunk with the same results. When I try and commit, the stack-trace that the broker produces is: [2013-07-31 10:34:14,423] ERROR Closing socket for /192.168.12.71 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127) at java.nio.ByteBuffer.get(ByteBuffer.java:675) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265) at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:345) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:680) Is this a bug, or is the protocol spec wrong? Also, since I can't seem to find a straight answer anywhere else: is offset fetch/commit expected to be in 0.8, 0.8.1, or some later release? Thanks, Evan -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-993) Offset Management API is either broken or mis-documented
[ https://issues.apache.org/jira/browse/KAFKA-993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus resolved KAFKA-993. - Resolution: Fixed Offset Management API is either broken or mis-documented Key: KAFKA-993 URL: https://issues.apache.org/jira/browse/KAFKA-993 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8, 0.8.1 Reporter: Evan Huus Assignee: Jun Rao I am in the process of building a set of Go client bindings for the new 0.8 protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). Everything works but the Offset Commit/Fetch APIs. Fetch never returns any data, and trying to Commit results in the broker forcibly disconnecting my client. I have double-checked the bytes on the wire using Wireshark, and my client is obeying the protocol spec. After some digging, I found KAFKA-852 which seems related, but I have tried my client against the 0.8 beta, 0.8 branch, and even trunk with the same results. When I try and commit, the stack-trace that the broker produces is: [2013-07-31 10:34:14,423] ERROR Closing socket for /192.168.12.71 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127) at java.nio.ByteBuffer.get(ByteBuffer.java:675) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265) at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:345) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:680) Is this a bug, or is the protocol spec wrong? Also, since I can't seem to find a straight answer anywhere else: is offset fetch/commit expected to be in 0.8, 0.8.1, or some later release? Thanks, Evan -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-993) Offset Management API is either broken or mis-documented
[ https://issues.apache.org/jira/browse/KAFKA-993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13745303#comment-13745303 ] Evan Huus commented on KAFKA-993: - The library is now available at https://github.com/Shopify/sarama In case somebody wants to add it to the wiki page of client libraries... Offset Management API is either broken or mis-documented Key: KAFKA-993 URL: https://issues.apache.org/jira/browse/KAFKA-993 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8, 0.8.1 Reporter: Evan Huus Assignee: Jun Rao I am in the process of building a set of Go client bindings for the new 0.8 protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). Everything works but the Offset Commit/Fetch APIs. Fetch never returns any data, and trying to Commit results in the broker forcibly disconnecting my client. I have double-checked the bytes on the wire using Wireshark, and my client is obeying the protocol spec. After some digging, I found KAFKA-852 which seems related, but I have tried my client against the 0.8 beta, 0.8 branch, and even trunk with the same results. When I try and commit, the stack-trace that the broker produces is: [2013-07-31 10:34:14,423] ERROR Closing socket for /192.168.12.71 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127) at java.nio.ByteBuffer.get(ByteBuffer.java:675) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265) at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:345) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:680) Is this a bug, or is the protocol spec wrong? Also, since I can't seem to find a straight answer anywhere else: is offset fetch/commit expected to be in 0.8, 0.8.1, or some later release? Thanks, Evan -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-993) Offset Management API is either broken or mis-documented
[ https://issues.apache.org/jira/browse/KAFKA-993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Huus updated KAFKA-993: Description: I am in the process of building a set of Go client bindings for the new 0.8 protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). Everything works but the Offset Commit/Fetch APIs. Fetch never returns any data, and trying to Commit results in the broker forcibly disconnecting my client. I have double-checked the bytes on the wire using Wireshark, and my client is obeying the protocol spec. After some digging, I found KAFKA-852 which seems related, but I have tried my client against the 0.8 beta, 0.8 branch, and even trunk with the same results. When I try and commit, the stack-trace that the broker produces is: [2013-07-31 10:34:14,423] ERROR Closing socket for /192.168.12.71 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127) at java.nio.ByteBuffer.get(ByteBuffer.java:675) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265) at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:345) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:680) Is this a bug, or is the protocol spec wrong? Also, since I can't seem to find a straight answer anywhere else: is offset fetch/commit expected to be in 0.8, 0.8.1, or some later release? Thanks, Evan was: I am in the process of building a set of Go client bindings for the new 0.8 protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). Everything works but the Offset Commit/Fetch APIs. Fetch never returns and data, and trying to Commit results in the broker forcibly disconnecting my client. I have double-checked the bytes on the wire using Wireshark, and my client is obeying the protocol spec. After some digging, I found KAFKA-852 which seems related, but I have tried my client against the 0.8 beta, 0.8 branch, and even trunk with the same results. When I try and commit, the stack-trace that the broker produces is: [2013-07-31 10:34:14,423] ERROR Closing socket for /192.168.12.71 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127) at java.nio.ByteBuffer.get(ByteBuffer.java:675) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265) at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:345) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:680) Is this a bug, or is the protocol spec wrong? Also, since I can't seem to find a straight answer anywhere else: is offset fetch/commit expected to be in 0.8, 0.8.1, or some later release? Thanks, Evan Offset Management API is either broken or mis-documented Key: KAFKA-993 URL: https://issues.apache.org/jira/browse/KAFKA-993 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8, 0.8.1 Reporter: Evan Huus Assignee: Jun Rao I am in the process of building a set of Go client bindings for the new 0.8 protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). Everything works but the Offset Commit/Fetch APIs. Fetch never returns any data, and trying to Commit results
[jira] [Created] (KAFKA-993) Offset Management API is either broken or mis-documented
Evan Huus created KAFKA-993: --- Summary: Offset Management API is either broken or mis-documented Key: KAFKA-993 URL: https://issues.apache.org/jira/browse/KAFKA-993 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8, 0.8.1 Reporter: Evan Huus Assignee: Jun Rao I am in the process of building a set of Go client bindings for the new 0.8 protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol). Everything works but the Offset Commit/Fetch APIs. Fetch never returns and data, and trying to Commit results in the broker forcibly disconnecting my client. I have double-checked the bytes on the wire using Wireshark, and my client is obeying the protocol spec. After some digging, I found KAFKA-852 which seems related, but I have tried my client against the 0.8 beta, 0.8 branch, and even trunk with the same results. When I try and commit, the stack-trace that the broker produces is: [2013-07-31 10:34:14,423] ERROR Closing socket for /192.168.12.71 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127) at java.nio.ByteBuffer.get(ByteBuffer.java:675) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:42) at kafka.api.UpdateMetadataRequest$$anonfun$readFrom$1.apply(UpdateMetadataRequest.scala:41) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach(Range.scala:265) at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:41) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:42) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:49) at kafka.network.Processor.read(SocketServer.scala:345) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:680) Is this a bug, or is the protocol spec wrong? Also, since I can't seem to find a straight answer anywhere else: is offset fetch/commit expected to be in 0.8, 0.8.1, or some later release? Thanks, Evan -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira