Re: Question about retention and log file times
inter.broker.protocol.version = 0.10.1-IV2 log.message.format.version = 0.10.1-IV2 It will take me longer to check the producer/consumer versions, but I believe they're all *at least* 0.10 -Gwilym On 20 April 2017 at 06:42, Manikumar wrote: > You may be producing in the old message format. Check the > "log.message.format.version" config. > What is the version of the Producer/Consumer clients? > > > On Thu, Apr 20, 2017 at 11:39 AM, Gwilym Evans < > gwilym.ev...@bigcommerce.com > > wrote: > > > I am running 0.10.1.0 so, if that's true, it might not be a default. If > you > > know of a config value to change that would be very helpful. > > > > -Gwilym > > > > On 20 April 2017 at 06:07, Manikumar wrote: > > > > > AFAIK, this behavior is changed in 0.10.1.0 release. Now retention is > > based > > > on the largest > > > timestamp of the messages in a log segment. > > > > > > On Thu, Apr 20, 2017 at 11:19 AM, Gwilym Evans < > > > gwilym.ev...@bigcommerce.com > > > > wrote: > > > > > > > Hello, > > > > > > > > Yesterday, I had to replace a faulty Kafka broker node, and the > method > > of > > > > replacement involved bringing up a blank replacement using the old > > > broker's > > > > ID, thus triggering a replication of all its old partitions. > > > > > > > > Today I was dealing with disk usage alerts for only that broker: it > > > turned > > > > out that the broker was not deleting old logs like the rest of the > > nodes. > > > > > > > > I haven't checked the code, but eventually I came to the conclusion > > that > > > > Kafka log file deletion is based on file create or modified time, > > rather > > > > than the max produce time of the messages within the log file itself. > > > > > > > > This makes the method I use of replacing a faulty node with a blank > > slate > > > > problematic, since five day old messages will be stored in a file > with > > a > > > > recent c/mtime, thus won't be deleted and will soon cause disk space > > > > exhaustion. > > > > > > > > My temporary workaround was to reduce retention of the largest topic > to > > > 24 > > > > hours but I'd prefer not doing that since it's more manual work and > it > > > > breaks my SLA. > > > > > > > > Can this behaviour of Kafka be changed via configs at all? > > > > > > > > Has anyone faced a similar problem and have suggestions? > > > > > > > > Thanks, > > > > Gwilym > > > > > > > > > >
Re: Question about retention and log file times
You may be producing in the old message format. Check the "log.message.format.version" config. What is the version of the Producer/Consumer clients? On Thu, Apr 20, 2017 at 11:39 AM, Gwilym Evans wrote: > I am running 0.10.1.0 so, if that's true, it might not be a default. If you > know of a config value to change that would be very helpful. > > -Gwilym > > On 20 April 2017 at 06:07, Manikumar wrote: > > > AFAIK, this behavior is changed in 0.10.1.0 release. Now retention is > based > > on the largest > > timestamp of the messages in a log segment. > > > > On Thu, Apr 20, 2017 at 11:19 AM, Gwilym Evans < > > gwilym.ev...@bigcommerce.com > > > wrote: > > > > > Hello, > > > > > > Yesterday, I had to replace a faulty Kafka broker node, and the method > of > > > replacement involved bringing up a blank replacement using the old > > broker's > > > ID, thus triggering a replication of all its old partitions. > > > > > > Today I was dealing with disk usage alerts for only that broker: it > > turned > > > out that the broker was not deleting old logs like the rest of the > nodes. > > > > > > I haven't checked the code, but eventually I came to the conclusion > that > > > Kafka log file deletion is based on file create or modified time, > rather > > > than the max produce time of the messages within the log file itself. > > > > > > This makes the method I use of replacing a faulty node with a blank > slate > > > problematic, since five day old messages will be stored in a file with > a > > > recent c/mtime, thus won't be deleted and will soon cause disk space > > > exhaustion. > > > > > > My temporary workaround was to reduce retention of the largest topic to > > 24 > > > hours but I'd prefer not doing that since it's more manual work and it > > > breaks my SLA. > > > > > > Can this behaviour of Kafka be changed via configs at all? > > > > > > Has anyone faced a similar problem and have suggestions? > > > > > > Thanks, > > > Gwilym > > > > > >
Re: Question about retention and log file times
I am running 0.10.1.0 so, if that's true, it might not be a default. If you know of a config value to change that would be very helpful. -Gwilym On 20 April 2017 at 06:07, Manikumar wrote: > AFAIK, this behavior is changed in 0.10.1.0 release. Now retention is based > on the largest > timestamp of the messages in a log segment. > > On Thu, Apr 20, 2017 at 11:19 AM, Gwilym Evans < > gwilym.ev...@bigcommerce.com > > wrote: > > > Hello, > > > > Yesterday, I had to replace a faulty Kafka broker node, and the method of > > replacement involved bringing up a blank replacement using the old > broker's > > ID, thus triggering a replication of all its old partitions. > > > > Today I was dealing with disk usage alerts for only that broker: it > turned > > out that the broker was not deleting old logs like the rest of the nodes. > > > > I haven't checked the code, but eventually I came to the conclusion that > > Kafka log file deletion is based on file create or modified time, rather > > than the max produce time of the messages within the log file itself. > > > > This makes the method I use of replacing a faulty node with a blank slate > > problematic, since five day old messages will be stored in a file with a > > recent c/mtime, thus won't be deleted and will soon cause disk space > > exhaustion. > > > > My temporary workaround was to reduce retention of the largest topic to > 24 > > hours but I'd prefer not doing that since it's more manual work and it > > breaks my SLA. > > > > Can this behaviour of Kafka be changed via configs at all? > > > > Has anyone faced a similar problem and have suggestions? > > > > Thanks, > > Gwilym > > >
Re: Question about retention and log file times
AFAIK, this behavior is changed in 0.10.1.0 release. Now retention is based on the largest timestamp of the messages in a log segment. On Thu, Apr 20, 2017 at 11:19 AM, Gwilym Evans wrote: > Hello, > > Yesterday, I had to replace a faulty Kafka broker node, and the method of > replacement involved bringing up a blank replacement using the old broker's > ID, thus triggering a replication of all its old partitions. > > Today I was dealing with disk usage alerts for only that broker: it turned > out that the broker was not deleting old logs like the rest of the nodes. > > I haven't checked the code, but eventually I came to the conclusion that > Kafka log file deletion is based on file create or modified time, rather > than the max produce time of the messages within the log file itself. > > This makes the method I use of replacing a faulty node with a blank slate > problematic, since five day old messages will be stored in a file with a > recent c/mtime, thus won't be deleted and will soon cause disk space > exhaustion. > > My temporary workaround was to reduce retention of the largest topic to 24 > hours but I'd prefer not doing that since it's more manual work and it > breaks my SLA. > > Can this behaviour of Kafka be changed via configs at all? > > Has anyone faced a similar problem and have suggestions? > > Thanks, > Gwilym >
Question about retention and log file times
Hello, Yesterday, I had to replace a faulty Kafka broker node, and the method of replacement involved bringing up a blank replacement using the old broker's ID, thus triggering a replication of all its old partitions. Today I was dealing with disk usage alerts for only that broker: it turned out that the broker was not deleting old logs like the rest of the nodes. I haven't checked the code, but eventually I came to the conclusion that Kafka log file deletion is based on file create or modified time, rather than the max produce time of the messages within the log file itself. This makes the method I use of replacing a faulty node with a blank slate problematic, since five day old messages will be stored in a file with a recent c/mtime, thus won't be deleted and will soon cause disk space exhaustion. My temporary workaround was to reduce retention of the largest topic to 24 hours but I'd prefer not doing that since it's more manual work and it breaks my SLA. Can this behaviour of Kafka be changed via configs at all? Has anyone faced a similar problem and have suggestions? Thanks, Gwilym
Re: how can I contribute to this project?
Hi James, This page has all the information you are looking for. https://kafka.apache.org/contributing On Thu, Apr 20, 2017 at 9:32 AM, James Chain wrote: > Hi > Because I love this project, so I want to take part of it. But I'm brand > new to opensource project. > > How can I get started to make contribution? Can you give me some advise or > something? > > By the way, I already have JIRA account which called "james.c" > > Sincerely, >James.C >
how can I contribute to this project?
Hi Because I love this project, so I want to take part of it. But I'm brand new to opensource project. How can I get started to make contribution? Can you give me some advise or something? By the way, I already have JIRA account which called "james.c" Sincerely, James.C
Re: Re: Re: ZK and Kafka failover testing
The kafka-console-producer.sh defaults to acks=1 so just be careful with using those tools for too much debugging. Your output is helpful though. https://github.com/apache/kafka/blob/5a2fcdd6d480e9f003cc49a59d5952ba4c515a71/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L185 -hans On Wed, Apr 19, 2017 at 3:52 PM, Shrikant Patel wrote: > Just to add, I see below behavior repeat with even command line console > producer and consumer that come with Kafka. > > Thanks, > Shri > __ > Shrikant Patel | 817.367.4302 > Enterprise Architecture Team > PDX-NHIN > > > -Original Message- > From: Shrikant Patel > Sent: Wednesday, April 19, 2017 5:49 PM > To: users@kafka.apache.org > Subject: RE: [EXTERNAL] Re: Re: ZK and Kafka failover testing > > Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response. > > Just to summarize briefly my steps, 5 node Kafka and ZK cluster. > 1. ZK cluster has all node working. Consumer is down. > 2. Bring down majority of ZK nodes. > 3. Thing are functional no issue (no dup or lost message) 4. Now first > kafka node come down. > 5. My issue start happening - as you see below producer says message with > key 34 and 35 failed. > 6. Bring majority of ZK node up. > 7. Other kafka nodes assumes leadership for node 1's topic. > 8. Bring consumer up, it starts consuming from the last offset and I see > duplicates. I see message 34 (3 times) and 35 (4 times) > > > Jeff, in my case I don’t see issue with kafka cluster recovering, once the > majority ZK nodes are up, other Kafka takes up leadership for down node > immediately. > Onur, as Jun mentioned since I have acks=all, I am not seeing any messages > being lost. > > Jun, Hans, I had same thought of trying to eliminate the consumer getting > duplicate because of incorrectly acking the message. In next run of this > test case, I was not run client at all. My consumer, producer properties > are in first email in this thread. As I understand RetriableException is > for temporary issue and I would like retry to see if issue resolves itself > by then, hence producer has retries =3 > > Producer log > > *** Publisher # Paritition - 12 Key - 26 Value - value 26 > *** Publisher # Paritition - 13 Key - 27 Value - value 27 > *** Publisher # Paritition - 14 Key - 28 Value - value 28 > *** Publisher # Paritition - 0 Key - 29 Value - value 29 > *** Publisher # Paritition - 1 Key - 30 Value - value 30 > *** Publisher # Paritition - 2 Key - 31 Value - value 31 > *** Publisher # Paritition - 3 Key - 32 Value - value 32 > *** Publisher # Paritition - 4 Key - 33 Value - value 33 > *** Publisher # Paritition - 5 Key - 34 Value - value 34 > 2017-04-19 16:39:08.008 WARN 399580 --- [| shri-producer] > o.a.k.clients.producer.internals.Sender : Got error produce response > with correlation id 37 on topic-partition ${topic-name}-5, retrying (2 > attempts left). Error: NETWORK_EXCEPTION > 2017-04-19 16:39:39.128 WARN 399580 --- [| shri-producer] > o.a.k.clients.producer.internals.Sender : Got error produce response > with correlation id 39 on topic-partition ${topic-name}-5, retrying (1 > attempts left). Error: NETWORK_EXCEPTION > 2017-04-19 16:40:10.271 WARN 399580 --- [| shri-producer] > o.a.k.clients.producer.internals.Sender : Got error produce response > with correlation id 41 on topic-partition ${topic-name}-5, retrying (0 > attempts left). Error: NETWORK_EXCEPTION > 2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] > o.s.k.support.LoggingProducerListener > : Exception thrown when sending a message with key='34' and > payload='value 34' to topic ${topic-name} and partition 5: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received. > 2017-04-19 16:42:50.621 INFO 399580 --- [pool-1-thread-1] > c.p.p.SpringKafkaPublisher_Simple: *** Failed to > publish Paritition - 5 Key - 34 Value - value 34 > java.util.concurrent.ExecutionException: > org.springframework.kafka.core.KafkaProducerException: > Failed to send; nested exception is > org.apache.kafka.common.errors.NetworkException: > The server disconnected before a response was received. > 2017-04-19 16:42:51.001 INFO 399580 --- [pool-1-thread-1] > c.p.p.SpringKafkaPublisher_Simple: *** Publisher > # Paritition - 6 Key - 35 Value - value 35 > 2017-04-19 16:43:21.010 WARN 399580 --- [| shri-producer] > o.a.k.clients.producer.internals.Sender : Got error produce response > with correlation id 49 on topic-partition ${topic-name}-6, retrying (2 > attempts left). Error: NETWORK_EXCEPTION > 2017-04-19 16:43:52.152 WARN 399580 --- [| shri-producer] > o.a.k.clients.producer.internals.Sender : Got error produce response > with correlation id 51 on topic-partition ${t
RE: Re: Re: ZK and Kafka failover testing
Just to add, I see below behavior repeat with even command line console producer and consumer that come with Kafka. Thanks, Shri __ Shrikant Patel | 817.367.4302 Enterprise Architecture Team PDX-NHIN -Original Message- From: Shrikant Patel Sent: Wednesday, April 19, 2017 5:49 PM To: users@kafka.apache.org Subject: RE: [EXTERNAL] Re: Re: ZK and Kafka failover testing Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response. Just to summarize briefly my steps, 5 node Kafka and ZK cluster. 1. ZK cluster has all node working. Consumer is down. 2. Bring down majority of ZK nodes. 3. Thing are functional no issue (no dup or lost message) 4. Now first kafka node come down. 5. My issue start happening - as you see below producer says message with key 34 and 35 failed. 6. Bring majority of ZK node up. 7. Other kafka nodes assumes leadership for node 1's topic. 8. Bring consumer up, it starts consuming from the last offset and I see duplicates. I see message 34 (3 times) and 35 (4 times) Jeff, in my case I don’t see issue with kafka cluster recovering, once the majority ZK nodes are up, other Kafka takes up leadership for down node immediately. Onur, as Jun mentioned since I have acks=all, I am not seeing any messages being lost. Jun, Hans, I had same thought of trying to eliminate the consumer getting duplicate because of incorrectly acking the message. In next run of this test case, I was not run client at all. My consumer, producer properties are in first email in this thread. As I understand RetriableException is for temporary issue and I would like retry to see if issue resolves itself by then, hence producer has retries =3 Producer log *** Publisher # Paritition - 12 Key - 26 Value - value 26 *** Publisher # Paritition - 13 Key - 27 Value - value 27 *** Publisher # Paritition - 14 Key - 28 Value - value 28 *** Publisher # Paritition - 0 Key - 29 Value - value 29 *** Publisher # Paritition - 1 Key - 30 Value - value 30 *** Publisher # Paritition - 2 Key - 31 Value - value 31 *** Publisher # Paritition - 3 Key - 32 Value - value 32 *** Publisher # Paritition - 4 Key - 33 Value - value 33 *** Publisher # Paritition - 5 Key - 34 Value - value 34 2017-04-19 16:39:08.008 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 37 on topic-partition ${topic-name}-5, retrying (2 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:39:39.128 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 39 on topic-partition ${topic-name}-5, retrying (1 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:40:10.271 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 41 on topic-partition ${topic-name}-5, retrying (0 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener: Exception thrown when sending a message with key='34' and payload='value 34' to topic ${topic-name} and partition 5: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 2017-04-19 16:42:50.621 INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple: *** Failed to publish Paritition - 5 Key - 34 Value - value 34 java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 2017-04-19 16:42:51.001 INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple: *** Publisher # Paritition - 6 Key - 35 Value - value 35 2017-04-19 16:43:21.010 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 49 on topic-partition ${topic-name}-6, retrying (2 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:43:52.152 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 51 on topic-partition ${topic-name}-6, retrying (1 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:44:23.234 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 53 on topic-partition ${topic-name}-6, retrying (0 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:44:54.421 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener: Exception thrown when sending a message with key='35' and payload='value
RE: Re: Re: ZK and Kafka failover testing
Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response. Just to summarize briefly my steps, 5 node Kafka and ZK cluster. 1. ZK cluster has all node working. Consumer is down. 2. Bring down majority of ZK nodes. 3. Thing are functional no issue (no dup or lost message) 4. Now first kafka node come down. 5. My issue start happening - as you see below producer says message with key 34 and 35 failed. 6. Bring majority of ZK node up. 7. Other kafka nodes assumes leadership for node 1's topic. 8. Bring consumer up, it starts consuming from the last offset and I see duplicates. I see message 34 (3 times) and 35 (4 times) Jeff, in my case I don’t see issue with kafka cluster recovering, once the majority ZK nodes are up, other Kafka takes up leadership for down node immediately. Onur, as Jun mentioned since I have acks=all, I am not seeing any messages being lost. Jun, Hans, I had same thought of trying to eliminate the consumer getting duplicate because of incorrectly acking the message. In next run of this test case, I was not run client at all. My consumer, producer properties are in first email in this thread. As I understand RetriableException is for temporary issue and I would like retry to see if issue resolves itself by then, hence producer has retries =3 Producer log *** Publisher # Paritition - 12 Key - 26 Value - value 26 *** Publisher # Paritition - 13 Key - 27 Value - value 27 *** Publisher # Paritition - 14 Key - 28 Value - value 28 *** Publisher # Paritition - 0 Key - 29 Value - value 29 *** Publisher # Paritition - 1 Key - 30 Value - value 30 *** Publisher # Paritition - 2 Key - 31 Value - value 31 *** Publisher # Paritition - 3 Key - 32 Value - value 32 *** Publisher # Paritition - 4 Key - 33 Value - value 33 *** Publisher # Paritition - 5 Key - 34 Value - value 34 2017-04-19 16:39:08.008 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 37 on topic-partition ${topic-name}-5, retrying (2 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:39:39.128 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 39 on topic-partition ${topic-name}-5, retrying (1 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:40:10.271 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 41 on topic-partition ${topic-name}-5, retrying (0 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener: Exception thrown when sending a message with key='34' and payload='value 34' to topic ${topic-name} and partition 5: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 2017-04-19 16:42:50.621 INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple: *** Failed to publish Paritition - 5 Key - 34 Value - value 34 java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 2017-04-19 16:42:51.001 INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple: *** Publisher # Paritition - 6 Key - 35 Value - value 35 2017-04-19 16:43:21.010 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 49 on topic-partition ${topic-name}-6, retrying (2 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:43:52.152 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 51 on topic-partition ${topic-name}-6, retrying (1 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:44:23.234 WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 53 on topic-partition ${topic-name}-6, retrying (0 attempts left). Error: NETWORK_EXCEPTION 2017-04-19 16:44:54.421 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener: Exception thrown when sending a message with key='35' and payload='value 35' to topic ${topic-name} and partition 6: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. Consumer log (consumer only started at the very end of the test scenario) value 21 value 22 value 23 value 24 value 25 value 26 value 27 value 28 value 29 value 30 value 31 value 32 value 33 value 34 value 34 value 34 value 35 value 35 value 35 value 35 Output of describe comman
Re: Re: ZK and Kafka failover testing
Oops, I linked to the wrong ticket, this is the one we hit: https://issues.apache.org/jira/browse/KAFKA-3042 On Wed, Apr 19, 2017 at 1:45 PM, Jeff Widman wrote: > > > > > > *As Onur explained, if ZK is down, Kafka can still work, but won't be able > to react to actual broker failures until ZK is up again. So if a broker is > down in that window, some of the partitions may not be ready for read or > write.* > We had a production scenario where ZK had a long GC pause and Kafka lost > connection temporarily. The brokers kept sending data just fine for > existing topics. However, when ZK came back, the kafka cluster could not > recover gracefully because of this issue: https://issues.apache.org/ > jira/browse/KAFKA-2729 > IIRC, in our case, the cached data that was mismatched was the controller > generations in zookeeper for the partition leaders did not match the > generation id listed in the controller znode. > Manually forcing a controller re-election solved this because it brought > all generation IDs in sync. However, it would have been nice if the cluster > had been able to automatically do the controller re-election without > waiting for manual intervention. > > On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao wrote: > >> Hi, Shri, >> >> As Onur explained, if ZK is down, Kafka can still work, but won't be able >> to react to actual broker failures until ZK is up again. So if a broker is >> down in that window, some of the partitions may not be ready for read or >> write. >> >> As for the duplicates in the consumer, Hans had a good point. It would be >> useful to see if the duplicates are introduced by the producer or the >> consumer. Perhaps you can read the log again and see if duplicates are in >> the log in the first place. Note that broker retries can introduce >> duplicates. >> >> Hi, Onur, >> >> For the data loss issue that you mentioned, that should only happen with >> acks=1. As we discussed offline, if acks=all is used and unclean leader >> election is disabled, acked messages shouldn't be lost. >> >> Thanks, >> >> Jun >> >> >> On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman < >> onurkaraman.apa...@gmail.com >> > wrote: >> >> > If this is what I think it is, it has nothing to do with acks, >> > max.in.flight.requests.per.connection, or anything client-side and is >> > purely about the kafka cluster. >> > >> > Here's a simple example involving a single zookeeper instance, 3 >> brokers, a >> > KafkaConsumer and KafkaProducer (neither of these clients interact with >> > zookeeper). >> > 1. start up zookeeper: >> > > ./bin/zookeeper-server-start.sh config/zookeeper.properties >> > >> > 2. start up some brokers: >> > > ./bin/kafka-server-start.sh config/server0.properties >> > > ./bin/kafka-server-start.sh config/server1.properties >> > > ./bin/kafka-server-start.sh config/server2.properties >> > >> > 3 create a topic: >> > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t >> > --partitions 1 --replication-factor 3 >> > >> > 4. start a console consumer (this needs to happen before step 5 so we >> can >> > write __consumer_offsets metadata to zookeeper): >> > > ./bin/kafka-console-consumer.sh --broker-list >> > localhost:9090,localhost:9091,localhost:9092 --topic t >> > >> > 5. kill zookeeper >> > >> > 6. start a console producer and produce some messages: >> > > ./bin/kafka-console-producer.sh --broker-list >> > localhost:9090,localhost:9091,localhost:9092 --topic t >> > >> > 7. notice the size of the broker logs grow with each message you send: >> > > l /tmp/kafka-logs*/t-0 >> > >> > 8. notice the consumer consuming the messages being produced >> > >> > Basically, zookeeper can be completely offline and your brokers will >> append >> > to logs and process client requests just fine as long as it doesn't >> need to >> > interact with zookeeper. Today, the only way a broker knows to stop >> > accepting requests is when it receives instruction from the controller. >> > >> > I first realized this last July when debugging a small production data >> loss >> > scenario that was a result of this[1]. Maybe this is an attempt at >> leaning >> > towards availability over consistency. Personally I think that brokers >> > should stop accepting requests when it disconnects from zookeeper. >> > >> > [1] The small production data loss scenario happens when accepting >> requests >> > during the small window in between a broker's zookeeper session >> expiration >> > and when the controller instructs the broker to stop accepting requests. >> > During this time, the broker still thinks it leads partitions that are >> > currently being led by another broker, effectively resulting in a window >> > where the partition is led by two brokers. Clients can continue sending >> > requests to the old leader, and for producers with low acknowledgement >> > settings (like ack=1), their messages will be lost without the client >> > knowing, as the messages are being appended to the phantom leader's logs >> > instead
Re: Re: ZK and Kafka failover testing
*As Onur explained, if ZK is down, Kafka can still work, but won't be able to react to actual broker failures until ZK is up again. So if a broker is down in that window, some of the partitions may not be ready for read or write.* We had a production scenario where ZK had a long GC pause and Kafka lost connection temporarily. The brokers kept sending data just fine for existing topics. However, when ZK came back, the kafka cluster could not recover gracefully because of this issue: https://issues.apache.org/jira/browse/KAFKA-2729 IIRC, in our case, the cached data that was mismatched was the controller generations in zookeeper for the partition leaders did not match the generation id listed in the controller znode. Manually forcing a controller re-election solved this because it brought all generation IDs in sync. However, it would have been nice if the cluster had been able to automatically do the controller re-election without waiting for manual intervention. On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao wrote: > Hi, Shri, > > As Onur explained, if ZK is down, Kafka can still work, but won't be able > to react to actual broker failures until ZK is up again. So if a broker is > down in that window, some of the partitions may not be ready for read or > write. > > As for the duplicates in the consumer, Hans had a good point. It would be > useful to see if the duplicates are introduced by the producer or the > consumer. Perhaps you can read the log again and see if duplicates are in > the log in the first place. Note that broker retries can introduce > duplicates. > > Hi, Onur, > > For the data loss issue that you mentioned, that should only happen with > acks=1. As we discussed offline, if acks=all is used and unclean leader > election is disabled, acked messages shouldn't be lost. > > Thanks, > > Jun > > > On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman < > onurkaraman.apa...@gmail.com > > wrote: > > > If this is what I think it is, it has nothing to do with acks, > > max.in.flight.requests.per.connection, or anything client-side and is > > purely about the kafka cluster. > > > > Here's a simple example involving a single zookeeper instance, 3 > brokers, a > > KafkaConsumer and KafkaProducer (neither of these clients interact with > > zookeeper). > > 1. start up zookeeper: > > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > > > 2. start up some brokers: > > > ./bin/kafka-server-start.sh config/server0.properties > > > ./bin/kafka-server-start.sh config/server1.properties > > > ./bin/kafka-server-start.sh config/server2.properties > > > > 3 create a topic: > > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > > --partitions 1 --replication-factor 3 > > > > 4. start a console consumer (this needs to happen before step 5 so we can > > write __consumer_offsets metadata to zookeeper): > > > ./bin/kafka-console-consumer.sh --broker-list > > localhost:9090,localhost:9091,localhost:9092 --topic t > > > > 5. kill zookeeper > > > > 6. start a console producer and produce some messages: > > > ./bin/kafka-console-producer.sh --broker-list > > localhost:9090,localhost:9091,localhost:9092 --topic t > > > > 7. notice the size of the broker logs grow with each message you send: > > > l /tmp/kafka-logs*/t-0 > > > > 8. notice the consumer consuming the messages being produced > > > > Basically, zookeeper can be completely offline and your brokers will > append > > to logs and process client requests just fine as long as it doesn't need > to > > interact with zookeeper. Today, the only way a broker knows to stop > > accepting requests is when it receives instruction from the controller. > > > > I first realized this last July when debugging a small production data > loss > > scenario that was a result of this[1]. Maybe this is an attempt at > leaning > > towards availability over consistency. Personally I think that brokers > > should stop accepting requests when it disconnects from zookeeper. > > > > [1] The small production data loss scenario happens when accepting > requests > > during the small window in between a broker's zookeeper session > expiration > > and when the controller instructs the broker to stop accepting requests. > > During this time, the broker still thinks it leads partitions that are > > currently being led by another broker, effectively resulting in a window > > where the partition is led by two brokers. Clients can continue sending > > requests to the old leader, and for producers with low acknowledgement > > settings (like ack=1), their messages will be lost without the client > > knowing, as the messages are being appended to the phantom leader's logs > > instead of the true leader's logs. > > > > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel > wrote: > > > > > While we were testing, our producer had following configuration > > > max.in.flight.requests.per.connection=1, acks= all and retries=3. > > > > > > The entire producer side set is below. The consumer h
Re: Re: ZK and Kafka failover testing
Hi, Shri, As Onur explained, if ZK is down, Kafka can still work, but won't be able to react to actual broker failures until ZK is up again. So if a broker is down in that window, some of the partitions may not be ready for read or write. As for the duplicates in the consumer, Hans had a good point. It would be useful to see if the duplicates are introduced by the producer or the consumer. Perhaps you can read the log again and see if duplicates are in the log in the first place. Note that broker retries can introduce duplicates. Hi, Onur, For the data loss issue that you mentioned, that should only happen with acks=1. As we discussed offline, if acks=all is used and unclean leader election is disabled, acked messages shouldn't be lost. Thanks, Jun On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman wrote: > If this is what I think it is, it has nothing to do with acks, > max.in.flight.requests.per.connection, or anything client-side and is > purely about the kafka cluster. > > Here's a simple example involving a single zookeeper instance, 3 brokers, a > KafkaConsumer and KafkaProducer (neither of these clients interact with > zookeeper). > 1. start up zookeeper: > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > 2. start up some brokers: > > ./bin/kafka-server-start.sh config/server0.properties > > ./bin/kafka-server-start.sh config/server1.properties > > ./bin/kafka-server-start.sh config/server2.properties > > 3 create a topic: > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --partitions 1 --replication-factor 3 > > 4. start a console consumer (this needs to happen before step 5 so we can > write __consumer_offsets metadata to zookeeper): > > ./bin/kafka-console-consumer.sh --broker-list > localhost:9090,localhost:9091,localhost:9092 --topic t > > 5. kill zookeeper > > 6. start a console producer and produce some messages: > > ./bin/kafka-console-producer.sh --broker-list > localhost:9090,localhost:9091,localhost:9092 --topic t > > 7. notice the size of the broker logs grow with each message you send: > > l /tmp/kafka-logs*/t-0 > > 8. notice the consumer consuming the messages being produced > > Basically, zookeeper can be completely offline and your brokers will append > to logs and process client requests just fine as long as it doesn't need to > interact with zookeeper. Today, the only way a broker knows to stop > accepting requests is when it receives instruction from the controller. > > I first realized this last July when debugging a small production data loss > scenario that was a result of this[1]. Maybe this is an attempt at leaning > towards availability over consistency. Personally I think that brokers > should stop accepting requests when it disconnects from zookeeper. > > [1] The small production data loss scenario happens when accepting requests > during the small window in between a broker's zookeeper session expiration > and when the controller instructs the broker to stop accepting requests. > During this time, the broker still thinks it leads partitions that are > currently being led by another broker, effectively resulting in a window > where the partition is led by two brokers. Clients can continue sending > requests to the old leader, and for producers with low acknowledgement > settings (like ack=1), their messages will be lost without the client > knowing, as the messages are being appended to the phantom leader's logs > instead of the true leader's logs. > > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel wrote: > > > While we were testing, our producer had following configuration > > max.in.flight.requests.per.connection=1, acks= all and retries=3. > > > > The entire producer side set is below. The consumer has manual offset > > commit, it commit offset after it has successfully processed the message. > > > > Producer setting > > bootstrap.servers= {point the F5 VS fronting Kafka cluster} > > key.serializer= {appropriate value as per your cases} > > value.serializer= {appropriate value as per your case} > > acks= all > > retries=3 > > ssl.key.password= {appropriate value as per your case} > > ssl.keystore.location= {appropriate value as per your case} > > ssl.keystore.password= {appropriate value as per your case} > > ssl.truststore.location= {appropriate value as per your case} > > ssl.truststore.password= {appropriate value as per your case} > > batch.size=16384 > > client.id= {appropriate value as per your case, may help with debugging} > > max.block.ms=65000 > > request.timeout.ms=3 > > security.protocol= SSL > > ssl.enabled.protocols=TLSv1.2 > > ssl.keystore.type=JKS > > ssl.protocol=TLSv1.2 > > ssl.truststore.type=JKS > > max.in.flight.requests.per.connection=1 > > metadata.fetch.timeout.ms=6 > > reconnect.backoff.ms=1000 > > retry.backoff.ms=1000 > > max.request.size=1048576 > > linger.ms=0 > > > > Consumer setting > > bootstrap.servers= {point the F5 VS fronting Kafka cluster} > > key.deserializer= {appropri
Re: Re: ZK and Kafka failover testing
The OP was asking about duplicate messages, not lost messages, so I think we are discussing two different possible scenarios. When ever someone says they see duplicate messages it's always good practice to first double check ack mode, in flight messages, and retries. Also its important to check if the messages are really duplicates in the Kafka log, or if they are just seeing the same message reprocessed several times in the consumer due to some other issue with offset commits. -hans On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman wrote: > If this is what I think it is, it has nothing to do with acks, > max.in.flight.requests.per.connection, or anything client-side and is > purely about the kafka cluster. > > Here's a simple example involving a single zookeeper instance, 3 brokers, a > KafkaConsumer and KafkaProducer (neither of these clients interact with > zookeeper). > 1. start up zookeeper: > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > 2. start up some brokers: > > ./bin/kafka-server-start.sh config/server0.properties > > ./bin/kafka-server-start.sh config/server1.properties > > ./bin/kafka-server-start.sh config/server2.properties > > 3 create a topic: > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --partitions 1 --replication-factor 3 > > 4. start a console consumer (this needs to happen before step 5 so we can > write __consumer_offsets metadata to zookeeper): > > ./bin/kafka-console-consumer.sh --broker-list > localhost:9090,localhost:9091,localhost:9092 --topic t > > 5. kill zookeeper > > 6. start a console producer and produce some messages: > > ./bin/kafka-console-producer.sh --broker-list > localhost:9090,localhost:9091,localhost:9092 --topic t > > 7. notice the size of the broker logs grow with each message you send: > > l /tmp/kafka-logs*/t-0 > > 8. notice the consumer consuming the messages being produced > > Basically, zookeeper can be completely offline and your brokers will append > to logs and process client requests just fine as long as it doesn't need to > interact with zookeeper. Today, the only way a broker knows to stop > accepting requests is when it receives instruction from the controller. > > I first realized this last July when debugging a small production data loss > scenario that was a result of this[1]. Maybe this is an attempt at leaning > towards availability over consistency. Personally I think that brokers > should stop accepting requests when it disconnects from zookeeper. > > [1] The small production data loss scenario happens when accepting requests > during the small window in between a broker's zookeeper session expiration > and when the controller instructs the broker to stop accepting requests. > During this time, the broker still thinks it leads partitions that are > currently being led by another broker, effectively resulting in a window > where the partition is led by two brokers. Clients can continue sending > requests to the old leader, and for producers with low acknowledgement > settings (like ack=1), their messages will be lost without the client > knowing, as the messages are being appended to the phantom leader's logs > instead of the true leader's logs. > > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel wrote: > > > While we were testing, our producer had following configuration > > max.in.flight.requests.per.connection=1, acks= all and retries=3. > > > > The entire producer side set is below. The consumer has manual offset > > commit, it commit offset after it has successfully processed the message. > > > > Producer setting > > bootstrap.servers= {point the F5 VS fronting Kafka cluster} > > key.serializer= {appropriate value as per your cases} > > value.serializer= {appropriate value as per your case} > > acks= all > > retries=3 > > ssl.key.password= {appropriate value as per your case} > > ssl.keystore.location= {appropriate value as per your case} > > ssl.keystore.password= {appropriate value as per your case} > > ssl.truststore.location= {appropriate value as per your case} > > ssl.truststore.password= {appropriate value as per your case} > > batch.size=16384 > > client.id= {appropriate value as per your case, may help with debugging} > > max.block.ms=65000 > > request.timeout.ms=3 > > security.protocol= SSL > > ssl.enabled.protocols=TLSv1.2 > > ssl.keystore.type=JKS > > ssl.protocol=TLSv1.2 > > ssl.truststore.type=JKS > > max.in.flight.requests.per.connection=1 > > metadata.fetch.timeout.ms=6 > > reconnect.backoff.ms=1000 > > retry.backoff.ms=1000 > > max.request.size=1048576 > > linger.ms=0 > > > > Consumer setting > > bootstrap.servers= {point the F5 VS fronting Kafka cluster} > > key.deserializer= {appropriate value as per your cases} > > value.deserializer= {appropriate value as per your case} > > group.id= {appropriate value as per your case} > > ssl.key.password= {appropriate value as per your case} > > ssl.keystore.location= {appropriate value as per your case} > > ssl.keys
Re: Re: ZK and Kafka failover testing
If this is what I think it is, it has nothing to do with acks, max.in.flight.requests.per.connection, or anything client-side and is purely about the kafka cluster. Here's a simple example involving a single zookeeper instance, 3 brokers, a KafkaConsumer and KafkaProducer (neither of these clients interact with zookeeper). 1. start up zookeeper: > ./bin/zookeeper-server-start.sh config/zookeeper.properties 2. start up some brokers: > ./bin/kafka-server-start.sh config/server0.properties > ./bin/kafka-server-start.sh config/server1.properties > ./bin/kafka-server-start.sh config/server2.properties 3 create a topic: > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t --partitions 1 --replication-factor 3 4. start a console consumer (this needs to happen before step 5 so we can write __consumer_offsets metadata to zookeeper): > ./bin/kafka-console-consumer.sh --broker-list localhost:9090,localhost:9091,localhost:9092 --topic t 5. kill zookeeper 6. start a console producer and produce some messages: > ./bin/kafka-console-producer.sh --broker-list localhost:9090,localhost:9091,localhost:9092 --topic t 7. notice the size of the broker logs grow with each message you send: > l /tmp/kafka-logs*/t-0 8. notice the consumer consuming the messages being produced Basically, zookeeper can be completely offline and your brokers will append to logs and process client requests just fine as long as it doesn't need to interact with zookeeper. Today, the only way a broker knows to stop accepting requests is when it receives instruction from the controller. I first realized this last July when debugging a small production data loss scenario that was a result of this[1]. Maybe this is an attempt at leaning towards availability over consistency. Personally I think that brokers should stop accepting requests when it disconnects from zookeeper. [1] The small production data loss scenario happens when accepting requests during the small window in between a broker's zookeeper session expiration and when the controller instructs the broker to stop accepting requests. During this time, the broker still thinks it leads partitions that are currently being led by another broker, effectively resulting in a window where the partition is led by two brokers. Clients can continue sending requests to the old leader, and for producers with low acknowledgement settings (like ack=1), their messages will be lost without the client knowing, as the messages are being appended to the phantom leader's logs instead of the true leader's logs. On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel wrote: > While we were testing, our producer had following configuration > max.in.flight.requests.per.connection=1, acks= all and retries=3. > > The entire producer side set is below. The consumer has manual offset > commit, it commit offset after it has successfully processed the message. > > Producer setting > bootstrap.servers= {point the F5 VS fronting Kafka cluster} > key.serializer= {appropriate value as per your cases} > value.serializer= {appropriate value as per your case} > acks= all > retries=3 > ssl.key.password= {appropriate value as per your case} > ssl.keystore.location= {appropriate value as per your case} > ssl.keystore.password= {appropriate value as per your case} > ssl.truststore.location= {appropriate value as per your case} > ssl.truststore.password= {appropriate value as per your case} > batch.size=16384 > client.id= {appropriate value as per your case, may help with debugging} > max.block.ms=65000 > request.timeout.ms=3 > security.protocol= SSL > ssl.enabled.protocols=TLSv1.2 > ssl.keystore.type=JKS > ssl.protocol=TLSv1.2 > ssl.truststore.type=JKS > max.in.flight.requests.per.connection=1 > metadata.fetch.timeout.ms=6 > reconnect.backoff.ms=1000 > retry.backoff.ms=1000 > max.request.size=1048576 > linger.ms=0 > > Consumer setting > bootstrap.servers= {point the F5 VS fronting Kafka cluster} > key.deserializer= {appropriate value as per your cases} > value.deserializer= {appropriate value as per your case} > group.id= {appropriate value as per your case} > ssl.key.password= {appropriate value as per your case} > ssl.keystore.location= {appropriate value as per your case} > ssl.keystore.password= {appropriate value as per your case} > ssl.truststore.location= {appropriate value as per your case} > ssl.truststore.password= {appropriate value as per your case} > enable.auto.commit=false > security.protocol= SSL > ssl.enabled.protocols=TLSv1.2 > ssl.keystore.type=JKS > ssl.protocol=TLSv1.2 > ssl.truststore.type=JKS > client.id= {appropriate value as per your case, may help with debugging} > reconnect.backoff.ms=1000 > retry.backoff.ms=1000 > > Thanks, > Shri > > -Original Message- > From: Hans Jespersen [mailto:h...@confluent.io] > Sent: Tuesday, April 18, 2017 7:57 PM > To: users@kafka.apache.org > Subject: [EXTERNAL] Re: ZK and Kafka failover testing > > * Notice: This email was receive
Re: Subscribe to mailing list
Arun , send e-mail to users-subscr...@kafka.apache.org Thanks, Prahalad On Wed, Apr 19, 2017 at 8:24 PM, Arunkumar wrote: > > Hi There > I would like to subscribe to this mailing list and know more about kafka. > Please add me to the list. Thanks in advance > > Thanks > Arunkumar Pichaimuthu, PMP >
Subscribe to mailing list
Hi There I would like to subscribe to this mailing list and know more about kafka. Please add me to the list. Thanks in advance Thanks Arunkumar Pichaimuthu, PMP
RE: Re: ZK and Kafka failover testing
While we were testing, our producer had following configuration max.in.flight.requests.per.connection=1, acks= all and retries=3. The entire producer side set is below. The consumer has manual offset commit, it commit offset after it has successfully processed the message. Producer setting bootstrap.servers= {point the F5 VS fronting Kafka cluster} key.serializer= {appropriate value as per your cases} value.serializer= {appropriate value as per your case} acks= all retries=3 ssl.key.password= {appropriate value as per your case} ssl.keystore.location= {appropriate value as per your case} ssl.keystore.password= {appropriate value as per your case} ssl.truststore.location= {appropriate value as per your case} ssl.truststore.password= {appropriate value as per your case} batch.size=16384 client.id= {appropriate value as per your case, may help with debugging} max.block.ms=65000 request.timeout.ms=3 security.protocol= SSL ssl.enabled.protocols=TLSv1.2 ssl.keystore.type=JKS ssl.protocol=TLSv1.2 ssl.truststore.type=JKS max.in.flight.requests.per.connection=1 metadata.fetch.timeout.ms=6 reconnect.backoff.ms=1000 retry.backoff.ms=1000 max.request.size=1048576 linger.ms=0 Consumer setting bootstrap.servers= {point the F5 VS fronting Kafka cluster} key.deserializer= {appropriate value as per your cases} value.deserializer= {appropriate value as per your case} group.id= {appropriate value as per your case} ssl.key.password= {appropriate value as per your case} ssl.keystore.location= {appropriate value as per your case} ssl.keystore.password= {appropriate value as per your case} ssl.truststore.location= {appropriate value as per your case} ssl.truststore.password= {appropriate value as per your case} enable.auto.commit=false security.protocol= SSL ssl.enabled.protocols=TLSv1.2 ssl.keystore.type=JKS ssl.protocol=TLSv1.2 ssl.truststore.type=JKS client.id= {appropriate value as per your case, may help with debugging} reconnect.backoff.ms=1000 retry.backoff.ms=1000 Thanks, Shri -Original Message- From: Hans Jespersen [mailto:h...@confluent.io] Sent: Tuesday, April 18, 2017 7:57 PM To: users@kafka.apache.org Subject: [EXTERNAL] Re: ZK and Kafka failover testing * Notice: This email was received from an external source * When you publish, is acks=0,1 or all (-1)? What is max.in.flight.requests.per.connection (default is 5)? It sounds to me like your publishers are using acks=0 and so they are not actually succeeding in publishing (i.e. you are getting no acks) but they will retry over and over and will have up to 5 retries in flight, so when the broker comes back up, you are getting 4 or 5 copies of the same message. Try setting max.in.flight.requests.per.connection=1 to get rid of duplicates Try setting acks=all to ensure the messages are being persisted by the leader and all the available replicas in the kafka cluster. -hans /** * Hans Jespersen, Principal Systems Engineer, Confluent Inc. * h...@confluent.io (650)924-2670 */ On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel wrote: > Hi All, > > I am seeing strange behavior between ZK and Kafka. We have 5 node in > ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1 > > The min.insync.replicas is 3, replication.factor is 5 for all topics, > unclean.leader.election.enable is false. We have 15 partitions for > each topic. > > The step we are following in our testing. > > > * My understanding is that ZK needs aleast 3 out of 5 server to be > functional. Kafka could not be functional without zookeeper. In out > testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka > is still functional, consumer\producer can still consume\publish from > Kafka cluster. We then bring down all ZK nodes, Kafka > consumer\producers are still functional. I am not able to understand > why Kafka cluster is not failing as soon as majority of ZK nodes are > down. I do see error in Kafka that it cannot connection to ZK cluster. > > > > * With all or majority of ZK node down, we bring down 1 Kafka > nodes (out of 5, so 4 are running). And at that point the consumer and > producer start failing. My guess is the new leadership election cannot > happen without ZK. > > > > * Then we bring up the majority of ZK node up. (1st Kafka is still > down) Now the Kafka cluster become functional, consumer and producer > now start working again. But Consumer sees big junk of message from > kafka, and many of them are duplicates. It's like these messages were > held up somewhere, Where\Why I don't know? And why the duplicates? I > can understand few duplicates for messages that consumer would not > commit before 1st node when down. But why so many duplicates and like > 4 copy for each message. I cannot understand this behavior. > > Appreciate some insight about our issues. Also if there are blogs that > describe the ZK and Kafka failover scenario behaviors, that would be > extremely helpful. > > Thanks, > Shri > > This e-
RE: Kafka Producer - Multiple broker - Data sent to buffer but not in Queue
Sorry about that. That was a typo. The exact configuration is as below: bootstrap.servers = , Thanks, Ranjith -Original Message- From: kamaltar...@gmail.com [mailto:kamaltar...@gmail.com] On Behalf Of Kamal C Sent: Wednesday, April 19, 2017 16:25 To: users@kafka.apache.org Subject: Re: Kafka Producer - Multiple broker - Data sent to buffer but not in Queue > bootstrap.servers = , Is your bootstrap.servers configuration is correct ? You have specified port `9091`, but running the GetOffsetShell command on `9094` On Wed, Apr 19, 2017 at 11:58 AM, Ranjith Anbazhakan < ranjith.anbazha...@aspiresys.com> wrote: > Unfortunately, there is no specific information (error/exception) in > the logs when the buffer to queue records data goes missing i.e) When > stopped broker (say broker 2) is started and followed by stopping > current running broker that received all producer sent records in buffer (say > broker 1). > > Thanks, > Ranjith > > -Original Message- > From: David Garcia [mailto:] > Sent: Wednesday, April 19, 2017 09:31 > To: users@kafka.apache.org > Subject: Re: Kafka Producer - Multiple broker - Data sent to buffer > but not in Queue > > What do broker logs say around the time you send your messages? > > On 4/18/17, 3:21 AM, "Ranjith Anbazhakan" com> wrote: > > Hi, > > I have been testing behavior of multiple broker instances of kafka > in same machine and facing inconsistent behavior of producer sent > records to buffer not being available in queue always. > > Tried kafka versions: > 0.10.2.0 > 0.10.1.0 > > Scenario: > > 1. Ran two broker instances in same machine. Say broker 1 as > initial leader, broker 2 as initial follower. > > 2. Stopped broker 1. Now broker 2 became leader. > > 3. Now producer sends records for a given topic TEST through > send() method, followed by flush(). Records have to go to Broker 2 > logically. No error/exception is thrown by code. (So it is assumed > data has been sent successfully to buffer) > > 4. When using command to check the records count for TEST topic > in Broker 2, the sent records are not added to existing records count > for that topic in queue. > > a. Used command - kafka-run-class.bat kafka.tools.GetOffsetShell > --broker-list localhost:9094 --topic TEST --time -1 (where TEST is the > used > topic) > > NOTE: **Step 4 is not happening always and is inconsistent**. In > the scenario when it does not work, if Broker 1 is made UP and then > made DOWN, records are always been available in queue in Broker 2 post doing > Step 3. > > Configurations: > Overall Producer configurations: (most are default values) > acks = all > batch.size = 16384 > block.on.buffer.full = false > bootstrap.servers = , > > buffer.memory = 33554432 > client.id = producer-1 > compression.type = none > connections.max.idle.ms = 54 > interceptor.classes = null > key.serializer = class org.apache.kafka.common. > serialization.StringSerializer > linger.ms = 1 > max.block.ms = 6 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.fetch.timeout.ms = 6 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 3 > partitioner.class = class org.apache.kafka.clients. > producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retries = 0 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null >
Re: Kafka Producer - Multiple broker - Data sent to buffer but not in Queue
> bootstrap.servers = , Is your bootstrap.servers configuration is correct ? You have specified port `9091`, but running the GetOffsetShell command on `9094` On Wed, Apr 19, 2017 at 11:58 AM, Ranjith Anbazhakan < ranjith.anbazha...@aspiresys.com> wrote: > Unfortunately, there is no specific information (error/exception) in the > logs when the buffer to queue records data goes missing i.e) When stopped > broker (say broker 2) is started and followed by stopping current running > broker that received all producer sent records in buffer (say broker 1). > > Thanks, > Ranjith > > -Original Message- > From: David Garcia [mailto:] > Sent: Wednesday, April 19, 2017 09:31 > To: users@kafka.apache.org > Subject: Re: Kafka Producer - Multiple broker - Data sent to buffer but > not in Queue > > What do broker logs say around the time you send your messages? > > On 4/18/17, 3:21 AM, "Ranjith Anbazhakan" com> wrote: > > Hi, > > I have been testing behavior of multiple broker instances of kafka in > same machine and facing inconsistent behavior of producer sent records to > buffer not being available in queue always. > > Tried kafka versions: > 0.10.2.0 > 0.10.1.0 > > Scenario: > > 1. Ran two broker instances in same machine. Say broker 1 as > initial leader, broker 2 as initial follower. > > 2. Stopped broker 1. Now broker 2 became leader. > > 3. Now producer sends records for a given topic TEST through > send() method, followed by flush(). Records have to go to Broker 2 > logically. No error/exception is thrown by code. (So it is assumed data has > been sent successfully to buffer) > > 4. When using command to check the records count for TEST topic > in Broker 2, the sent records are not added to existing records count for > that topic in queue. > > a. Used command - kafka-run-class.bat kafka.tools.GetOffsetShell > --broker-list localhost:9094 --topic TEST --time -1 (where TEST is the used > topic) > > NOTE: **Step 4 is not happening always and is inconsistent**. In the > scenario when it does not work, if Broker 1 is made UP and then made DOWN, > records are always been available in queue in Broker 2 post doing Step 3. > > Configurations: > Overall Producer configurations: (most are default values) > acks = all > batch.size = 16384 > block.on.buffer.full = false > bootstrap.servers = , > > buffer.memory = 33554432 > client.id = producer-1 > compression.type = none > connections.max.idle.ms = 54 > interceptor.classes = null > key.serializer = class org.apache.kafka.common. > serialization.StringSerializer > linger.ms = 1 > max.block.ms = 6 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.fetch.timeout.ms = 6 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 3 > partitioner.class = class org.apache.kafka.clients. > producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retries = 0 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > timeout.ms = 3 > value.serializer = class org.apache.kafka.common. > serializa
Re: possible kafka bug, maybe in console producer/consumer utils
Just to point to you all, I also get similar exception in my streams application when producer is trying to commit something to changelog topic. Error sending record to topic test-stream-key-table-changelog org.apache.kafka.common.errors.TimeoutException: Batch containing 2 record(s) expired due to timeout while requesting metadata from brokers for test-stream-key-table-changelog-0 Now here producer is what is within official kafka release, so no question of using some deprecated apis. Also this happens in both 0.10.2.0 and 0.10.1.1 clients. Again not sure what is the cause of this exception as till now have not been able to correlate the same with kafka server side logs. But this should be looked into it. Again here everything is hosted on linux and within the same lan. I doubt network connectivity between the machines is the issue. Hope this gives some more insight. Thanks Sachin On Wed, Apr 19, 2017 at 3:01 PM, jan wrote: > @Robert Quinlivan: the producer is just the kafka-console-producer > shell that comes in the kafka/bin directory (kafka/bin/windows in my > case). Nothing special. > I'll try messing with acks because this problem is somewhat incidental > to what I'm trying to do which is see how big the log directory grows. > > It's possible kafkacat or other producers would do a better job than > the console producer but I'll try that on linux as getting them > working on windows, meh. > > thanks all > > jan > > > On 18/04/2017, David Garcia wrote: > > The “NewShinyProducer” is also deprecated. > > > > On 4/18/17, 5:41 PM, "David Garcia" wrote: > > > > The console producer in the 0.10.0.0 release uses the old producer > which > > doesn’t have “backoff”…it’s really just for testing simple producing: > > > > object ConsoleProducer { > > > > def main(args: Array[String]) { > > > > try { > > val config = new ProducerConfig(args) > > val reader = > > Class.forName(config.readerClass).newInstance().asInstanceOf > [MessageReader] > > reader.init(System.in, getReaderProps(config)) > > > > val producer = > > if(config.useOldProducer) { > > new OldProducer(getOldProducerProps(config)) > > } else { > > new NewShinyProducer(getNewProducerProps(config)) > > } > > > > > > > > On 4/18/17, 5:31 PM, "Robert Quinlivan" > wrote: > > > > I am curious how your producer is configured. The producer > maintains > > an > > internal buffer of messages to be sent over to the broker. Is it > > possible > > you are terminating the producer code in your test before the > buffer > > is > > exhausted? > > > > On Tue, Apr 18, 2017 at 5:29 PM, jan > > wrote: > > > > > Thanks to both of you. Some quick points: > > > > > > I'd expect there to be backpressure from the producer if the > > broker is > > > busy ie. the broker would not respond to the console producer > if > > the > > > broker was too busy accept more messages, and the producer > would > > hang > > > on the socket. Alternatively I'd hope the console producer > would > > have > > > the sense to back off and retry but clearly(?) not. > > > This behaviour is actually relevant to my old job so I need to > > know more. > > > > > > Perhaps the timeout mentioned in the error msg can just be > upped? > > > > > > *Is* the claimed timeout relevant? > > > > Batch containing 8 record(s) expired due to timeout while > > requesting > > > metadata from brokers for big_ptns1_repl1_nozip-0 > > > > > > Why is the producer expiring records? > > > > > > But I'm surprised this happened because my setup is one machine > > with > > > everything running on it. No network. Also Kafka writes to the > > disk > > > without an fsync (or its equivalent on windows) which means it > > just > > > gets cached in ram before being lazily written to disk, and > I've > > got > > > plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its > > overhead > > > so it grows to ~8GB but still, it need not hit disk at all (and > > the > > > file goes into the windows memory, not java's). > > > Maybe it is GC holding things up but I dunno, GC even for a > second > > or > > > two should not cause a socket failure, just delay the read, > though > > I'm > > > not an expert on this *at all*. > > > > > > I'll go over the answers tomorrow more carefully but thanks > > anyway! > > > > > > cheers > > > > > > jan > > > > > > On 18/04/2017, Serega Sheypak > wrote: > > > >> err, isn't it supposed to? Isn't the loss of data a very > > serious error? > > > > Kafka can't fix networking issues like latencies, blinking, > > > unava
Re: possible kafka bug, maybe in console producer/consumer utils
@Robert Quinlivan: the producer is just the kafka-console-producer shell that comes in the kafka/bin directory (kafka/bin/windows in my case). Nothing special. I'll try messing with acks because this problem is somewhat incidental to what I'm trying to do which is see how big the log directory grows. It's possible kafkacat or other producers would do a better job than the console producer but I'll try that on linux as getting them working on windows, meh. thanks all jan On 18/04/2017, David Garcia wrote: > The “NewShinyProducer” is also deprecated. > > On 4/18/17, 5:41 PM, "David Garcia" wrote: > > The console producer in the 0.10.0.0 release uses the old producer which > doesn’t have “backoff”…it’s really just for testing simple producing: > > object ConsoleProducer { > > def main(args: Array[String]) { > > try { > val config = new ProducerConfig(args) > val reader = > Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] > reader.init(System.in, getReaderProps(config)) > > val producer = > if(config.useOldProducer) { > new OldProducer(getOldProducerProps(config)) > } else { > new NewShinyProducer(getNewProducerProps(config)) > } > > > > On 4/18/17, 5:31 PM, "Robert Quinlivan" wrote: > > I am curious how your producer is configured. The producer maintains > an > internal buffer of messages to be sent over to the broker. Is it > possible > you are terminating the producer code in your test before the buffer > is > exhausted? > > On Tue, Apr 18, 2017 at 5:29 PM, jan > wrote: > > > Thanks to both of you. Some quick points: > > > > I'd expect there to be backpressure from the producer if the > broker is > > busy ie. the broker would not respond to the console producer if > the > > broker was too busy accept more messages, and the producer would > hang > > on the socket. Alternatively I'd hope the console producer would > have > > the sense to back off and retry but clearly(?) not. > > This behaviour is actually relevant to my old job so I need to > know more. > > > > Perhaps the timeout mentioned in the error msg can just be upped? > > > > *Is* the claimed timeout relevant? > > > Batch containing 8 record(s) expired due to timeout while > requesting > > metadata from brokers for big_ptns1_repl1_nozip-0 > > > > Why is the producer expiring records? > > > > But I'm surprised this happened because my setup is one machine > with > > everything running on it. No network. Also Kafka writes to the > disk > > without an fsync (or its equivalent on windows) which means it > just > > gets cached in ram before being lazily written to disk, and I've > got > > plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its > overhead > > so it grows to ~8GB but still, it need not hit disk at all (and > the > > file goes into the windows memory, not java's). > > Maybe it is GC holding things up but I dunno, GC even for a second > or > > two should not cause a socket failure, just delay the read, though > I'm > > not an expert on this *at all*. > > > > I'll go over the answers tomorrow more carefully but thanks > anyway! > > > > cheers > > > > jan > > > > On 18/04/2017, Serega Sheypak wrote: > > >> err, isn't it supposed to? Isn't the loss of data a very > serious error? > > > Kafka can't fix networking issues like latencies, blinking, > > unavailability > > > or any other weird stuff. Kafka promises you to persist data if > data > > > reaches Kafka. Data delivery responsibility to kafka is on your > side. You > > > fail to do it according to logs. > > > > > > 0.02% not 2% > > > You should check broker logs to figure out what went wrong. All > things > > > happen on one machine as far as I understand. Maybe your brokers > don't > > have > > > enough mem and they stuck because of GC and don't respond to > producer. > > > Async producer fails to send data. That is why you observe data > loss on > > > consumer side. > > > > > > > > > 2017-04-18 23:32 GMT+02:00 jan : > > > > > >> Hi Serega, > > >> > > >> > data didn't reach producer. So why should data appear in > consumer? > > >> > > >> err, isn't it supposed to? Isn't the loss of data a very > serious error? > > >> > > >> > loss rate is more or less similar [...] Not so bad. > > >> > > >> That made me laugh at least. Is kafka intended to be a > reliable > > >> messag