[ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328505#comment-16328505 ]
huxihx commented on KAFKA-6449: ------------------------------- Try to increase `session.timeout.ms` > KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more > than request.timeout.ms > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-6449 > URL: https://issues.apache.org/jira/browse/KAFKA-6449 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.0.1 > Reporter: zhaoshijie > Priority: Major > > I use code as as follow consumer a partition of kafka topic, I got 40s > latency every poll > {code:java} > @Test > public void testTimeOut() throws Exception { > String test_topic = "timeOut_test"; > int test_partition = 1; > Map<String, Object> kafkaParams = new HashMap<String, Object>(); > kafkaParams.put("auto.offset.reset", "earliest"); > kafkaParams.put("enable.auto.commit", false); > kafkaParams.put("bootstrap.servers", "*"); > kafkaParams.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("group.id", "test-consumer-" + > System.currentTimeMillis()); > // kafkaParams.put("reconnect.backoff.ms", "0"); > // kafkaParams.put("max.poll.records", "500"); > KafkaConsumer<String, String> consumer = new KafkaConsumer<String, > String>(kafkaParams); > consumer.assign(Arrays.asList(new TopicPartition(test_topic, > test_partition))); > Long offset = 0L; > while (true) { > Long startPollTime = System.currentTimeMillis(); > consumer.seek(new TopicPartition(test_topic, test_partition), > offset); > ConsumerRecords<String, String> records = consumer.poll(120000); > logger.info("poll take " + (System.currentTimeMillis() - > startPollTime) + "ms, MSGCount is " + records.count()); > Thread.sleep(41000); > Iterator<ConsumerRecord<String, String>> consumerRecordIterable = > records.records(test_topic).iterator(); > while (consumerRecordIterable.hasNext()) { > offset = consumerRecordIterable.next().offset(); > } > } > } > {code} > log as follow: > {code:java} > 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 40000 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > group.id = test-consumer-1516100013868 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > session.timeout.ms = 30000 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > auto.offset.reset = earliest > | > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) > 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = consumer-1 > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 40000 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > group.id = test-consumer-1516100013868 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > session.timeout.ms = 30000 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > auto.offset.reset = earliest > | > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) > 2018-01-16 18:53:34,034 |INFO | main | Kafka version : 0.10.0.1 | > org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83) > 2018-01-16 18:53:34,034 |INFO | main | Kafka commitId : a7a17cdec9eaa6c5 | > org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84) > 2018-01-16 18:53:34,034 |INFO | main | Discovered coordinator polaris-1:9092 > (id: 2147483647 rack: null) for group test-consumer-1516100013868. | > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.handleGroupMetadataResponse(AbstractCoordinator.java:505) > 2018-01-16 18:53:34,034 |INFO | main | Fetch offset 0 is out of range for > partition timeOut_test-1, resetting offset | > org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:585) > 2018-01-16 18:53:35,035 |INFO | main | poll take 780ms, MSGCount is 15943 | > com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43) > 2018-01-16 18:54:56,056 |INFO | main | poll take 40186ms, MSGCount is 15887 | > com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43) > 2018-01-16 18:56:17,017 |INFO | main | poll take 40121ms, MSGCount is 15672 | > com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43) > 2018-01-16 18:57:38,038 |INFO | main | poll take 40110ms, MSGCount is 15650 | > com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43) > 2018-01-16 18:58:59,059 |INFO | main | poll take 40269ms, MSGCount is 15650 | > com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43) > 2018-01-16 19:00:20,020 |INFO | main | poll take 40105ms, MSGCount is 15650 | > com.zsj.tools.KafkaConsumerTest.testTimeOut(KafkaConsumerTest.java:43){code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)