The OP was asking about duplicate messages, not lost messages, so I think
we are discussing two different possible scenarios. When ever someone says
they see duplicate messages it's always good practice to first double check
ack mode, in flight messages, and retries. Also its important to check if
the messages are really duplicates in the Kafka log, or if they are just
seeing the same message reprocessed several times in the consumer due to
some other issue with offset commits.

-hans

On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <onurkaraman.apa...@gmail.com
> wrote:

> If this is what I think it is, it has nothing to do with acks,
> max.in.flight.requests.per.connection, or anything client-side and is
> purely about the kafka cluster.
>
> Here's a simple example involving a single zookeeper instance, 3 brokers, a
> KafkaConsumer and KafkaProducer (neither of these clients interact with
> zookeeper).
> 1. start up zookeeper:
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
>
> 2. start up some brokers:
> > ./bin/kafka-server-start.sh config/server0.properties
> > ./bin/kafka-server-start.sh config/server1.properties
> > ./bin/kafka-server-start.sh config/server2.properties
>
> 3 create a topic:
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
> --partitions 1 --replication-factor 3
>
> 4. start a console consumer (this needs to happen before step 5 so we can
> write __consumer_offsets metadata to zookeeper):
> > ./bin/kafka-console-consumer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 5. kill zookeeper
>
> 6. start a console producer and produce some messages:
> > ./bin/kafka-console-producer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 7. notice the size of the broker logs grow with each message you send:
> > l /tmp/kafka-logs*/t-0
>
> 8. notice the consumer consuming the messages being produced
>
> Basically, zookeeper can be completely offline and your brokers will append
> to logs and process client requests just fine as long as it doesn't need to
> interact with zookeeper. Today, the only way a broker knows to stop
> accepting requests is when it receives instruction from the controller.
>
> I first realized this last July when debugging a small production data loss
> scenario that was a result of this[1]. Maybe this is an attempt at leaning
> towards availability over consistency. Personally I think that brokers
> should stop accepting requests when it disconnects from zookeeper.
>
> [1] The small production data loss scenario happens when accepting requests
> during the small window in between a broker's zookeeper session expiration
> and when the controller instructs the broker to stop accepting requests.
> During this time, the broker still thinks it leads partitions that are
> currently being led by another broker, effectively resulting in a window
> where the partition is led by two brokers. Clients can continue sending
> requests to the old leader, and for producers with low acknowledgement
> settings (like ack=1), their messages will be lost without the client
> knowing, as the messages are being appended to the phantom leader's logs
> instead of the true leader's logs.
>
> On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <spa...@pdxinc.com> wrote:
>
> > While we were testing, our producer had following configuration
> > max.in.flight.requests.per.connection=1, acks= all and retries=3.
> >
> > The entire producer side set is below. The consumer has manual offset
> > commit, it commit offset after it has successfully processed the message.
> >
> > Producer setting
> > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > key.serializer= {appropriate value as per your cases}
> > value.serializer= {appropriate value as per your case}
> > acks= all
> > retries=3
> > ssl.key.password= {appropriate value as per your case}
> > ssl.keystore.location= {appropriate value as per your case}
> > ssl.keystore.password= {appropriate value as per your case}
> > ssl.truststore.location= {appropriate value as per your case}
> > ssl.truststore.password= {appropriate value as per your case}
> > batch.size=16384​
> > client.id= {appropriate value as per your case, may help with debugging}
> > max.block.ms​=65000
> > request.timeout.ms=30000
> > security.protocol= SSL
> > ssl.enabled.protocols=TLSv1.2
> > ssl.keystore.type=JKS
> > ssl.protocol=TLSv1.2
> > ssl.truststore.type=JKS
> > max.in.flight.requests.per.connection=1
> > metadata.fetch.timeout.ms=60000
> > reconnect.backoff.ms=1000
> > retry.backoff.ms​=1000
> > max.request.size=1048576​​
> > linger.ms=0
> >
> > Consumer setting
> > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > key.deserializer= {appropriate value as per your cases}
> > value.deserializer= {appropriate value as per your case}
> > group.id= {appropriate value as per your case}
> > ssl.key.password= {appropriate value as per your case}
> > ssl.keystore.location= {appropriate value as per your case}
> > ssl.keystore.password= {appropriate value as per your case}
> > ssl.truststore.location= {appropriate value as per your case}
> > ssl.truststore.password= {appropriate value as per your case}
> > enable.auto.commit=false
> > security.protocol= SSL
> > ssl.enabled.protocols=TLSv1.2
> > ssl.keystore.type=JKS
> > ssl.protocol=TLSv1.2
> > ssl.truststore.type=JKS
> > client.id= {appropriate value as per your case, may help with
> debugging}​
> > reconnect.backoff.ms=1000
> > retry.backoff.ms​=1000​
> >
> > Thanks,
> > Shri
> >
> > -----Original Message-----
> > From: Hans Jespersen [mailto:h...@confluent.io]
> > Sent: Tuesday, April 18, 2017 7:57 PM
> > To: users@kafka.apache.org
> > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
> >
> > ***** Notice: This email was received from an external source *****
> >
> > When you publish, is acks=0,1 or all (-1)?
> > What is max.in.flight.requests.per.connection (default is 5)?
> >
> > It sounds to me like your publishers are using acks=0 and so they are not
> > actually succeeding in publishing (i.e. you are getting no acks) but they
> > will retry over and over and will have up to 5 retries in flight, so when
> > the broker comes back up, you are getting 4 or 5 copies of the same
> message.
> >
> > Try setting max.in.flight.requests.per.connection=1 to get rid of
> > duplicates Try setting acks=all to ensure the messages are being
> persisted
> > by the leader and all the available replicas in the kafka cluster.
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * h...@confluent.io (650)924-2670
> >  */
> >
> > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <spa...@pdxinc.com>
> wrote:
> >
> > > Hi All,
> > >
> > > I am seeing strange behavior between ZK and Kafka. We have 5 node in
> > > ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1
> > >
> > > The min.insync.replicas is 3, replication.factor is 5 for all topics,
> > > unclean.leader.election.enable is false. We have 15 partitions for
> > > each topic.
> > >
> > > The step we are following in our testing.
> > >
> > >
> > > *         My understanding is that ZK needs aleast 3 out of 5 server to
> > be
> > > functional. Kafka could not be functional without zookeeper. In out
> > > testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka
> > > is still functional, consumer\producer can still consume\publish from
> > > Kafka cluster. We then bring down all ZK nodes, Kafka
> > > consumer\producers are still functional. I am not able to understand
> > > why Kafka cluster is not failing as soon as majority of ZK nodes are
> > > down. I do see error in Kafka that it cannot connection to ZK cluster.
> > >
> > >
> > >
> > > *         With all or majority of ZK node down, we bring down 1 Kafka
> > > nodes (out of 5, so 4 are running). And at that point the consumer and
> > > producer start failing. My guess is the new leadership election cannot
> > > happen without ZK.
> > >
> > >
> > >
> > > *         Then we bring up the majority of ZK node up. (1st Kafka is
> > still
> > > down) Now the Kafka cluster become functional, consumer and producer
> > > now start working again. But Consumer sees big junk of message from
> > > kafka, and many of them are duplicates. It's like these messages were
> > > held up somewhere, Where\Why I don't know?  And why the duplicates? I
> > > can understand few duplicates for messages that consumer would not
> > > commit before 1st node when down. But why so many duplicates and like
> > > 4 copy for each message. I cannot understand this behavior.
> > >
> > > Appreciate some insight about our issues. Also if there are blogs that
> > > describe the ZK and Kafka failover scenario behaviors, that would be
> > > extremely helpful.
> > >
> > > Thanks,
> > > Shri
> > >
> > > This e-mail and its contents (to include attachments) are the property
> > > of National Health Systems, Inc., its subsidiaries and affiliates,
> > > including but not limited to Rx.com Community Healthcare Network, Inc.
> > > and its subsidiaries, and may contain confidential and proprietary or
> > > privileged information. If you are not the intended recipient of this
> > > e-mail, you are hereby notified that any unauthorized disclosure,
> > > copying, or distribution of this e-mail or of its attachments, or the
> > > taking of any unauthorized action based on information contained herein
> > is strictly prohibited.
> > > Unauthorized use of information contained herein may subject you to
> > > civil and criminal prosecution and penalties. If you are not the
> > > intended recipient, please immediately notify the sender by telephone
> > > at
> > > 800-433-5719 or return e-mail and permanently delete the original
> e-mail.
> > >
> > This e-mail and its contents (to include attachments) are the property of
> > National Health Systems, Inc., its subsidiaries and affiliates, including
> > but not limited to Rx.com Community Healthcare Network, Inc. and its
> > subsidiaries, and may contain confidential and proprietary or privileged
> > information. If you are not the intended recipient of this e-mail, you
> are
> > hereby notified that any unauthorized disclosure, copying, or
> distribution
> > of this e-mail or of its attachments, or the taking of any unauthorized
> > action based on information contained herein is strictly prohibited.
> > Unauthorized use of information contained herein may subject you to civil
> > and criminal prosecution and penalties. If you are not the intended
> > recipient, please immediately notify the sender by telephone at
> > 800-433-5719 or return e-mail and permanently delete the original e-mail.
> >
>

Reply via email to