[ https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605019#comment-16605019 ]
Matthias J. Sax commented on KAFKA-7365: ---------------------------------------- So how long do you expect that it take to process the messages returned from poll()? Did you try to set `max.poll.interval.ms` to a larger value than expected processing time? > max.poll.records setting in Kafka Consumer is not working > --------------------------------------------------------- > > Key: KAFKA-7365 > URL: https://issues.apache.org/jira/browse/KAFKA-7365 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Kashyap Ivaturi > Priority: Major > > Hi, > I have a requirement where I consume messages one by one, each message has > additional processing that I should do and then manually commit the offset. > Things work well most of the times until I get a big bunch of records which > takes longer time to process and I encounter CommitFailed exception for the > last set of records even though they were processed. While i'am able to > reconnect back its picking some messages that I had already processed. I > don't want this to happen as its creating duplicates in target systems that I > integrate with while processing the message. > > I decided that even though there are more messages in the queue , I would > like to have a control on how many records I can process when polled. > I tried to replicate a scenario where I have started the consumer by setting > 'max.poll.records' to '1' and then pushed 4 messages into the Topic the > consumer is listening. > I expected that the consumer will only process 1 message because of my > 'max.poll.records' setting but the consumer has processed all the 4 messages > in single poll. Any idea why did it not consider 'max.poll.records' setting > or is some other setting overriding this setting?. Appreciate your help or > guidance in troubleshooting this issue. > Here is the log of my Consumer config when it starts: > > 2018-08-28 08:29:47.873 INFO 91121 --- [ main] > o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000 > auto.offset.reset = earliest > bootstrap.servers = [messaging-rtp3.cisco.com:9093] > check.crcs = true > [client.id|https://client.id/] = > [connections.max.idle.ms|https://connections.max.idle.ms/] = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500 > fetch.min.bytes = 1 > [group.id|https://group.id/] = empestor > [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000 > interceptor.classes = null > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > [max.poll.interval.ms|https://max.poll.interval.ms/] = 300000 > max.poll.records = 1 > [metadata.max.age.ms|https://metadata.max.age.ms/] = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000 > [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50 > [request.timeout.ms|https://request.timeout.ms/] = 40000 > [retry.backoff.ms|https://retry.backoff.ms/] = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = SSL > send.buffer.bytes = 131072 > [session.timeout.ms|https://session.timeout.ms/] = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = [hidden] > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = > /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks > ssl.keystore.password = [hidden] > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = > /kafka/certs/empestor/certificates/kafka.client.truststore.jks > ssl.truststore.password = [hidden] > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > > 2018-08-28 08:29:48.079 INFO 91121 --- [ main] > o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)