Join the list
Hi, Could you add me into the mailing list ?
Query regarding Kafka partitions and Consumer rebalancing
Hi, Currently we trying to configure Kafka in our system for pulling messages from Queues. We have multiple consumers( we might want to add consumers if load on one consumer increases) which need to receive and process messages from a Kafka queue. Based on my understanding, under a single consumer group, one partition can be read by only one consumer. So if we want to make the setup such that no consumer gets over loaded in any case, what would be the best way to do it. If we have 6 partitions and 3 consumers which are equally efficient, then load seems to be distributed equally. Suppose one of the consumers say Consumer-3, for some reason processes the data 10 times slower, the we would want to reduce the load on Consumer-3 and equally distribute load on Consumer-1 and Counsumer-2. We wanted a pull based system which would help us in reducing the load on a slow consumer. Please let us know if there is any way to do this? Does kafka have any alternate implementation in such cases? Thanks, Madhavi.
message loss for sync producer, acks=2, topic replicas=3
Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang
Re: message loss for sync producer, acks=2, topic replicas=3
Hello Jiang, Which version of Kafka are you using, and did you kill the broker with -9? Guozhang On Tue, Jul 15, 2014 at 9:23 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang -- -- Guozhang
Re: message loss for sync producer, acks=2, topic replicas=3
Guozhang, I'm testing on 0.8.1.1; just kill pid, no -9. Regards, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:27:50 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Hello Jiang, Which version of Kafka are you using, and did you kill the broker with -9? Guozhang On Tue, Jul 15, 2014 at 9:23 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang -- -- Guozhang
Re: Query regarding Kafka partitions and Consumer rebalancing
Hi Madhavi, Dynamically re-balance partitions based on processing efficiency and load is a bit tricky to do in the current consumer since rebalances will only be triggered by consumer membership change or topic/partition change. For your case you would probably stop the slow consumer so that a rebalance will be triggered to re-distribute partitions to the rest of the consumers. Guozhang On Tue, Jul 15, 2014 at 4:35 AM, Madhavi Gokana (Vizury) madhavi.gok...@vizury.com wrote: Hi, Currently we trying to configure Kafka in our system for pulling messages from Queues. We have multiple consumers( we might want to add consumers if load on one consumer increases) which need to receive and process messages from a Kafka queue. Based on my understanding, under a single consumer group, one partition can be read by only one consumer. So if we want to make the setup such that no consumer gets over loaded in any case, what would be the best way to do it. If we have 6 partitions and 3 consumers which are equally efficient, then load seems to be distributed equally. Suppose one of the consumers say Consumer-3, for some reason processes the data 10 times slower, the we would want to reduce the load on Consumer-3 and equally distribute load on Consumer-1 and Counsumer-2. We wanted a pull based system which would help us in reducing the load on a slow consumer. Please let us know if there is any way to do this? Does kafka have any alternate implementation in such cases? Thanks, Madhavi. -- -- Guozhang
Re: message loss for sync producer, acks=2, topic replicas=3
What config property values did you use on producer/consumer/broker? Guozhang On Tue, Jul 15, 2014 at 10:32 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Guozhang, I'm testing on 0.8.1.1; just kill pid, no -9. Regards, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:27:50 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Hello Jiang, Which version of Kafka are you using, and did you kill the broker with -9? Guozhang On Tue, Jul 15, 2014 at 9:23 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang -- -- Guozhang -- -- Guozhang
Re: message loss for sync producer, acks=2, topic replicas=3
Guozhang, Please find the config below: Producer: props.put(producer.type, sync); props.put(request.required.acks, 2); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, kafka.producer.DefaultPartitioner); props.put(message.send.max.retries, 60); props.put(retry.backoff.ms, 300); Consumer: props.put(zookeeper.session.timeout.ms, 400); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.interval.ms, 1000); Broker: num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 num.partitions=2 log.retention.hours=168 log.retention.bytes=2000 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false zookeeper.connection.timeout.ms=100 Topic: Topic:p1r3 PartitionCount:1ReplicationFactor:3 Configs:retention.bytes=100 Thanks, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:59:03 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 What config property values did you use on producer/consumer/broker? Guozhang On Tue, Jul 15, 2014 at 10:32 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Guozhang, I'm testing on 0.8.1.1; just kill pid, no -9. Regards, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:27:50 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Hello Jiang, Which version of Kafka are you using, and did you kill the broker with -9? Guozhang On Tue, Jul 15, 2014 at 9:23 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang -- -- Guozhang -- -- Guozhang
Re: message loss for sync producer, acks=2, topic replicas=3
Guozhang,My coworker came up with an explaination: at one moment the leader L, and two followers F1, F2 are all in ISR. The producer sends a message m1 and receives acks from L and F1. Before the messge is replicated to F2, L is down. In the following leader election, F2, instead of F1, becomes the leader, and loses m1 somehow. Could that be the root cause? Thanks, Jiang From: users@kafka.apache.org At: Jul 15 2014 15:05:25 To: users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Guozhang, Please find the config below: Producer: props.put(producer.type, sync); props.put(request.required.acks, 2); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, kafka.producer.DefaultPartitioner); props.put(message.send.max.retries, 60); props.put(retry.backoff.ms, 300); Consumer: props.put(zookeeper.session.timeout.ms, 400); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.interval.ms, 1000); Broker: num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 num.partitions=2 log.retention.hours=168 log.retention.bytes=2000 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false zookeeper.connection.timeout.ms=100 Topic: Topic:p1r3 PartitionCount:1ReplicationFactor:3 Configs:retention.bytes=100 Thanks, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:59:03 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 What config property values did you use on producer/consumer/broker? Guozhang On Tue, Jul 15, 2014 at 10:32 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Guozhang, I'm testing on 0.8.1.1; just kill pid, no -9. Regards, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:27:50 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Hello Jiang, Which version of Kafka are you using, and did you kill the broker with -9? Guozhang On Tue, Jul 15, 2014 at 9:23 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang -- -- Guozhang -- -- Guozhang
Re: message loss for sync producer, acks=2, topic replicas=3
That could be the cause, and it can be verified by changing the acks to -1 and checking the data loss ratio then. Guozhang On Tue, Jul 15, 2014 at 12:49 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Guozhang,My coworker came up with an explaination: at one moment the leader L, and two followers F1, F2 are all in ISR. The producer sends a message m1 and receives acks from L and F1. Before the messge is replicated to F2, L is down. In the following leader election, F2, instead of F1, becomes the leader, and loses m1 somehow. Could that be the root cause? Thanks, Jiang From: users@kafka.apache.org At: Jul 15 2014 15:05:25 To: users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Guozhang, Please find the config below: Producer: props.put(producer.type, sync); props.put(request.required.acks, 2); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, kafka.producer.DefaultPartitioner); props.put(message.send.max.retries, 60); props.put(retry.backoff.ms, 300); Consumer: props.put(zookeeper.session.timeout.ms, 400); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.interval.ms, 1000); Broker: num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 num.partitions=2 log.retention.hours=168 log.retention.bytes=2000 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false zookeeper.connection.timeout.ms=100 Topic: Topic:p1r3 PartitionCount:1ReplicationFactor:3 Configs:retention.bytes=100 Thanks, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:59:03 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 What config property values did you use on producer/consumer/broker? Guozhang On Tue, Jul 15, 2014 at 10:32 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Guozhang, I'm testing on 0.8.1.1; just kill pid, no -9. Regards, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:27:50 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Hello Jiang, Which version of Kafka are you using, and did you kill the broker with -9? Guozhang On Tue, Jul 15, 2014 at 9:23 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang -- -- Guozhang -- -- Guozhang -- -- Guozhang
Re: message loss for sync producer, acks=2, topic replicas=3
When ack=-1 and the publisher thread number is high, it always happens that only the leader remains in ISR and shutting down the leader will cause message loss. The leader election code shows that the new leader will be the first alive broker in the ISR list. So it's possible the new leader will be behind the followers. It seems that after a broker becomes a leader, it stops replicating from others even when it hasn't received all available messages? Regards, Jiang - Original Message - From: wangg...@gmail.com To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org At: Jul 15 2014 16:11:17 That could be the cause, and it can be verified by changing the acks to -1 and checking the data loss ratio then. Guozhang On Tue, Jul 15, 2014 at 12:49 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Guozhang,My coworker came up with an explaination: at one moment the leader L, and two followers F1, F2 are all in ISR. The producer sends a message m1 and receives acks from L and F1. Before the messge is replicated to F2, L is down. In the following leader election, F2, instead of F1, becomes the leader, and loses m1 somehow. Could that be the root cause? Thanks, Jiang From: users@kafka.apache.org At: Jul 15 2014 15:05:25 To: users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Guozhang, Please find the config below: Producer: props.put(producer.type, sync); props.put(request.required.acks, 2); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, kafka.producer.DefaultPartitioner); props.put(message.send.max.retries, 60); props.put(retry.backoff.ms, 300); Consumer: props.put(zookeeper.session.timeout.ms, 400); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.interval.ms, 1000); Broker: num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 num.partitions=2 log.retention.hours=168 log.retention.bytes=2000 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false zookeeper.connection.timeout.ms=100 Topic: Topic:p1r3 PartitionCount:1ReplicationFactor:3 Configs:retention.bytes=100 Thanks, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:59:03 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 What config property values did you use on producer/consumer/broker? Guozhang On Tue, Jul 15, 2014 at 10:32 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Guozhang, I'm testing on 0.8.1.1; just kill pid, no -9. Regards, Jiang From: users@kafka.apache.org At: Jul 15 2014 13:27:50 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Subject: Re: message loss for sync producer, acks=2, topic replicas=3 Hello Jiang, Which version of Kafka are you using, and did you kill the broker with -9? Guozhang On Tue, Jul 15, 2014 at 9:23 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) jwu...@bloomberg.net wrote: Hi, I observed some unexpected message loss in kafka fault tolerant test. In the test, a topic with 3 replicas is created. A sync producer with acks=2 publishes to the topic. A consumer consumes from the topic and tracks message ids. During the test, the leader is killed. Both producer and consumer continue to run for a while. After the producer stops, the consumer reports if all messages are received. The test was repeated multiple rounds; message loss happened in about 10% of the tests. A typical scenario is as follows: before the leader is killed, all 3 replicas are in ISR. After the leader is killed, one follower becomes the leader, and 2 replicas (including the new leader) are in ISR. Both the producer and consumer pause for several seconds during that time, and then continue. Message loss happens after the leader is killed. Because the new leader is in ISR before the old leader is killed, unclean leader election doesn't explain the message loss. I'm wondering if anyone else also observed such message loss? Is there any known issue that may cause the message loss in the above scenario? Thanks, Jiang -- -- Guozhang -- -- Guozhang -- -- Guozhang
Re: Join the list
Guozhang, I'm not sure he was your message since you just reply to the mailing list... François Langelier Étudiant en génie Logiciel - École de Technologie Supérieure http://www.etsmtl.ca/ Capitaine Club Capra http://capra.etsmtl.ca/ VP-Communication - CS Games http://csgames.org 2014 Jeux de Génie http://www.jdgets.com/ 2011 à 2014 Argentier Fraternité du Piranha http://fraternitedupiranha.com/ 2012-2014 Comité Organisateur Olympiades ÉTS 2012 Compétition Québécoise d'Ingénierie 2012 - Compétition Senior On 15 July 2014 13:13, Guozhang Wang wangg...@gmail.com wrote: Hi Guangle, It is a self-serving mailing list: http://kafka.apache.org/contact.html Guozhang On Mon, Jul 14, 2014 at 11:25 PM, Guangle Fan fanguan...@gmail.com wrote: Hi, Could you add me into the mailing list ? -- -- Guozhang
Durably storing messages in Kafka
I think I know the answer to this already but I wanted to check my assumptions before proceeding. We are using Kafka as a queueing mechanism for receiving messages from stateless producers. We are operating in a legal framework where we can never lose a committed message, but we can reject a write if Kafka is unavailable and it will be retried in the future. We are operating all of our servers in one rack so we are vulnerable if a whole rack goes out. We will have 3-4 Kafka brokers and have RF=3 To guarantee that we never (to the greatest extent possible) lose a message that we have acknowledged, it seems like we need to have request.required.acks=-1 and log.flush.interval.messages = 1, i.e. fsync on every message and wait for all brokers in ISR to reply before returning successfully. This would guard against the failure scenario where all servers in our rack go down simultaneously. Is my understanding correct? Thanks, Daniel.