[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603760#comment-16603760
 ] 

Matthias J. Sax commented on KAFKA-7365:
----------------------------------------

This might help to understand session.timeout.ms and max.poll.internal.ms 
configs: 
[https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0/]

Also not sure, what you mean by "has processed all the 4 messages in single 
poll"? Note, that fetching from Kafka and poll() is independent. A single fetch 
return data base on `max.fetch.bytes` – this can be more than 
`max.poll.records` – however, on a single poll() call, only `max.poll.records` 
should be return in the iterator while all other are buffered within the 
consumer. If you call poll() again, the next records are returned from the 
buffer without sending another fetch request to the broker.

Also, what version do you use?

> max.poll.records setting in Kafka Consumer is not working
> ---------------------------------------------------------
>
>                 Key: KAFKA-7365
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7365
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Kashyap Ivaturi
>            Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 300000
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
> [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
> [request.timeout.ms|https://request.timeout.ms/] = 40000
> [retry.backoff.ms|https://retry.backoff.ms/] = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> [session.timeout.ms|https://session.timeout.ms/] = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = 
> /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks
> ssl.keystore.password = [hidden]
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = 
> /kafka/certs/empestor/certificates/kafka.client.truststore.jks
> ssl.truststore.password = [hidden]
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>  
> 2018-08-28 08:29:48.079  INFO 91121 --- [           main] 
> o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to