Kashyap Ivaturi created KAFKA-7365:
--------------------------------------
Summary: max.poll.records setting in Kafka Consumer is not working
Key: KAFKA-7365
URL: https://issues.apache.org/jira/browse/KAFKA-7365
Project: Kafka
Issue Type: Bug
Components: consumer
Reporter: Kashyap Ivaturi
Hi,
I have a requirement where I consume messages one by one, each message has
additional processing that I should do and then manually commit the offset.
Things work well most of the times until I get a big bunch of records which
takes longer time to process and I encounter CommitFailed exception for the
last set of records even though they were processed. While i'am able to
reconnect back its picking some messages that I had already processed. I don't
want this to happen as its creating duplicates in target systems that I
integrate with while processing the message.
I decided that even though there are more messages in the queue , I would like
to have a control on how many records I can process when polled.
I tried to replicate a scenario where I have started the consumer by setting
'max.poll.records' to '1' and then pushed 4 messages into the Topic the
consumer is listening.
I expected that the consumer will only process 1 message because of my
'max.poll.records' setting but the consumer has processed all the 4 messages in
single poll. Any idea why did it not consider 'max.poll.records' setting or is
some other setting overriding this setting?. Appreciate your help or guidance
in troubleshooting this issue.
Here is the log of my Consumer config when it starts:
2018-08-28 08:29:47.873 INFO 91121 --- [ main]
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
[auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
auto.offset.reset = earliest
bootstrap.servers = [messaging-rtp3.cisco.com:9093]
check.crcs = true
[client.id|https://client.id/] =
[connections.max.idle.ms|https://connections.max.idle.ms/] = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
[fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
fetch.min.bytes = 1
[group.id|https://group.id/] = empestor
[heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
[max.poll.interval.ms|https://max.poll.interval.ms/] = 300000
max.poll.records = 1
[metadata.max.age.ms|https://metadata.max.age.ms/] = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
[metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
[reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
[reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
[request.timeout.ms|https://request.timeout.ms/] = 40000
[retry.backoff.ms|https://retry.backoff.ms/] = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
[session.timeout.ms|https://session.timeout.ms/] = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location =
/kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location =
/kafka/certs/empestor/certificates/kafka.client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
2018-08-28 08:29:48.079 INFO 91121 --- [ main]
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)