Cristian Manoliu created KAFKA-9619:
---------------------------------------
Summary: Receiving duplicates when application is configured for
exactly once
Key: KAFKA-9619
URL: https://issues.apache.org/jira/browse/KAFKA-9619
Project: Kafka
Issue Type: Bug
Components: consumer, producer
Affects Versions: 2.0.1
Environment: Red Hat Enterprise Linux Server release 6.10 (Santiago)
Reporter: Cristian Manoliu
Attachments: log
Hi. There are cases (very rarely, but there are) when I receive duplicates,
even if everything is configured for high durability and we use exactly once
configuration.
Please check below the application context and test scenario that causes this
issue.
h2. Kafka Cluster Setup
3 x Kafka Brokers (1 on *host1*, 2 on *host2* and 3 on *host3*)
3 x Zookeeper instances (1 on *host1*, 2 on *host2* and 3 on *host3*)
h3. Kafka configuration
broker.id=1,2,3
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/kafka/logs/kafka
min.insync.replicas=3
transaction.state.log.min.isr=3
default.replication.factor=3
log.retention.minutes=600
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=host1:2181,host2:2181,host3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=1000
log.message.timestamp.type=LogAppendTime
delete.topic.enable=true
auto.create.topics.enable=false
unclean.leader.election.enable=false
h3. ZooKeeper configuration
tickTime=2000
dataDir=/home/kafka/logs/zk
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
h2. Kafka internal topics description
Topic:__transaction_state PartitionCount:50 ReplicationFactor:3
Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
Topic: __transaction_state Partition: 0 Leader: 1
Replicas: 3,2,1 Isr: 1,2,3
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 1
Replicas: 3,2,1 Isr: 1,2,3
h2. Application topics
h3. Topic input-event
Topic:input-event PartitionCount:3 ReplicationFactor:3
Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
Topic: input-event Partition: 0 Leader: 1 Replicas: 1,2,3
Isr: 1,2,3
Topic: input-event Partition: 1 Leader: 2 Replicas: 2,3,1
Isr: 1,2,3
Topic: input-event Partition: 2 Leader: 3 Replicas: 3,1,2
Isr: 1,2,3
h3. Topic output-event
Topic:output-event PartitionCount:3 ReplicationFactor:3
Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
Topic: output-event Partition: 0 Leader: 2 Replicas:
2,3,1 Isr: 1,2,3
Topic: output-event Partition: 1 Leader: 3 Replicas:
3,1,2 Isr: 1,2,3
Topic: output-event Partition: 2 Leader: 1 Replicas:
1,2,3 Isr: 1,2,3
h2. Application consumer properties
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [host1:9092, host2:9092, host3:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 134217728
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupId
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_committed
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 134217728
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 1000
request.timeout.ms = 30000
retry.backoff.ms = 1000
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
h2. Application producer properties
bootstrapServers = "host1, host2, host3"
transactionIdPrefix = "my-producer-"${instance}"
"enable.idempotence" = "true"
"acks" = "all"
"retries" = "2147483647"
"transaction.timeout.ms" = "10000"
"max.in.flight.requests.per.connection" = "1"
"reconnect.backoff.max.ms" = "1000"
"reconnect.backoff.ms" = "1000"
"retry.backoff.ms" = "1000"
h2. Application handling commits
Using {{KafkaTransactionManager}}, we start transaction, write message to
output topic using {{KafkaTemplate}} and also send consumer offsets
(spring-kafka 2.2.8.RELEASE).
h2. Test expected/actual
h3. Test description
# Write 32,000 messages to input topic
# Start 3 application instances
# Start process the messages one by one (max.poll.records = 1)
# During processing, send *SIGKILL* (kill -9) in parallel to *host1* and
*host2* Kafka Brokers for 50 times.
# Wait 60 seconds
# During processing, send *SIGKILL* (kill -9) in parallel to *host1* and
*host3* Kafka Brokers for 50 times.
# Wait 60 seconds
# During processing, send *SIGKILL* (kill -9) in parallel to *host2* and
*host3* Kafka Brokers for 50 times.
Expectation would have been to have 32,000 messages to the output topic,
however, sometimes we actually end up with a duplicate (at least one).
There are times when we end up with 32,000 messages and everything is right.
Every time the issue occurs, the sequence of events is the same.
Seeing this {{Attempt to heartbeat failed since group is rebalancing}} right
before a commit.
Attached application log file.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)