[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598591#comment-16598591
 ] 

M. Manna edited comment on KAFKA-7365 at 8/31/18 11:55 AM:
-----------------------------------------------------------

[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

{{Do you know roughly how much delay (max) you need to process the message? 
e.g. 20s ? If so, do you want to set that in your heartbeat.interval.ms=20000 
(and adjust session.timeout.ms to be ~3x more than that) and see if you are 
still encountering the issue? I might have overlooked something, but from the 
code I don't see any reason why this will be overridden (will check this 
anyway).}}

{{If you can take a look at this part of code (Fetcher.java) - it might be what 
you are experiencing:}}

{{                            // this case shouldn't usually happen because we 
only send one fetch at a time per partition,}}
{{                            // but it might conceivably happen in some rare 
cases (such as partition leader changes).}}
{{                            // we have to copy to a new list because the old 
one may be immutable}}
{{                            List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());}}
{{                            newRecords.addAll(currentRecords);}}
{{                            newRecords.addAll(records);}}
{{                            fetched.put(partition, newRecords);}}


was (Author: manme...@gmail.com):
[~kivaturi] I am not sure how your heartbeat interval (and session timeout) has 
been harmonised as per the long processing delay. Also, max.poll.records is a 
tested config in PlainTextConsumerTest.scala file which works as expected.

Do you know roughly how much delay (max) you need to process the message? e.g. 
20s ? If so, do you want to set that in your heartbeat.interval.ms=20000 (and 
adjust session.timeout.ms to be ~3x more than that) and see if you are still 
encountering the issue? I might have overlooked something, but from the code I 
don't see any reason why this will be overridden (will check this anyway).

> max.poll.records setting in Kafka Consumer is not working
> ---------------------------------------------------------
>
>                 Key: KAFKA-7365
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7365
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Kashyap Ivaturi
>            Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 300000
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
> [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
> [request.timeout.ms|https://request.timeout.ms/] = 40000
> [retry.backoff.ms|https://retry.backoff.ms/] = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> [session.timeout.ms|https://session.timeout.ms/] = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = 
> /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks
> ssl.keystore.password = [hidden]
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = 
> /kafka/certs/empestor/certificates/kafka.client.truststore.jks
> ssl.truststore.password = [hidden]
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>  
> 2018-08-28 08:29:48.079  INFO 91121 --- [           main] 
> o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to