[
https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860815#comment-15860815
]
Jason Gustafson commented on KAFKA-4753:
----------------------------------------
Could scenario 1 be mitigated with a new partition assignor which took into
account partition leadership?
> KafkaConsumer susceptible to FetchResponse starvation
> -----------------------------------------------------
>
> Key: KAFKA-4753
> URL: https://issues.apache.org/jira/browse/KAFKA-4753
> Project: Kafka
> Issue Type: Bug
> Reporter: Onur Karaman
> Assignee: Onur Karaman
>
> FetchResponse starvation here means that the KafkaConsumer repeatedly fails
> to fully form FetchResponses within the request timeout from a subset of the
> brokers its fetching from while FetchResponses from the other brokers can get
> fully formed and processed by the application.
> In other words, this ticket is concerned with scenarios where fetching from
> some brokers hurts the progress of fetching from other brokers to the point
> of repeatedly hitting a request timeout.
> Some FetchResponse starvation scenarios:
> 1. partition leadership of the consumer's assigned partitions is skewed
> across brokers, causing uneven FetchResponse sizes across brokers.
> 2. the consumer seeks back on partitions on some brokers but not others,
> causing uneven FetchResponse sizes across brokers.
> 3. the consumer's ability to keep up with various partitions across brokers
> is skewed, causing uneven FetchResponse sizes across brokers.
> I've personally seen scenario 1 happen this past week to one of our users in
> prod. They manually assigned partitions such that a few brokers led most of
> the partitions while other brokers only led a single partition. When
> NetworkClient sends out FetchRequests to different brokers in parallel with
> an uneven partition distribution, FetchResponses from brokers who lead more
> partitions will contain more data than FetchResponses from brokers who lead
> few partitions. This means the small FetchResponses will get fully formed
> quicker than larger FetchResponses. When the application eventually consumes
> a smaller fully formed FetchResponses, the NetworkClient will send out a new
> FetchRequest to the lightly-loaded broker. Their response will again come
> back quickly while only marginal progress has been made on the larger
> FetchResponse. Repeat this process several times and your application will
> have potentially processed many smaller FetchResponses while the larger
> FetchResponse made minimal progress and is forced to timeout, causing the
> large FetchResponse to start all over again, which causes starvation.
> To mitigate the problem for the short term, I've suggested to our user that
> they either:
> 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB
> to something like 1 MB. This is the solution I short-term solution I
> suggested they go with.
> 2. reduce the "max.partition.fetch.bytes" down from the current default of 1
> MB to something like 100 KB. This solution wasn't advised as it could impact
> broker performance.
> 3. ask our SRE's to run a partition reassignment to balance out the partition
> leadership (partitions were already being led by their preferred leaders).
> 4. bump up their request timeout. It was set to open-source's former default
> of 40 seconds.
> Contributing factors:
> 1. uneven FetchResponse sizes across brokers.
> 2. processing time of the polled ConsumerRecords.
> 3. "max.poll.records" increases the number of polls needed to consume a
> FetchResponse, making constant-time overhead per poll magnified.
> 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to
> ConsumerNetworkClient.poll.
> 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and
> ConsumerNetworkClient.poll can return before the poll timeout as soon as a
> single channel is selected.
> 6. NetworkClient.poll is solely driven by the user thread with manual
> partition assignment.
> So far I've only locally reproduced starvation scenario 1 and haven't even
> attempted the other scenarios. Preventing the bypass of
> ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but
> it seems starvation would still be possible.
> How to reproduce starvation scenario 1:
> 1. startup zookeeper
> 2. startup two brokers
> 3. create a topic t0 with two partitions led by broker 0 and create a topic
> t1 with a single partition led by broker 1
> {code}
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0
> > --replica-assignment 0,0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1
> > --replica-assignment 1
> {code}
> 4. Produce a lot of data into these topics
> {code}
> > ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000
> > --record-size 100 --throughput 100000 --producer-props
> > bootstrap.servers=localhost:9090,localhost:9091
> > ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000
> > --record-size 100 --throughput 100000 --producer-props
> > bootstrap.servers=localhost:9090,localhost:9091
> {code}
> 5. startup a consumer that consumes these 3 partitions with TRACE level
> NetworkClient logging
> {code}
> > ./bin/kafka-run-class.sh
> > org.apache.kafka.clients.consumer.StarvedFetchResponseTest 10000 3000 65536
> {code}
> The config/tools-log4j.properties:
> {code}
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements. See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> log4j.rootLogger=WARN, stderr
> log4j.appender.stderr=org.apache.log4j.ConsoleAppender
> log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
> log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
> log4j.appender.stderr.Target=System.err
> log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr
> log4j.additivity.org.apache.kafka.clients.NetworkClient=false
> {code}
> The consumer code:
> {code}
> /**
> * Licensed to the Apache Software Foundation (ASF) under one or more
> contributor license agreements. See the NOTICE
> * file distributed with this work for additional information regarding
> copyright ownership. The ASF licenses this file
> * to You under the Apache License, Version 2.0 (the "License"); you may not
> use this file except in compliance with the
> * License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> * specific language governing permissions and limitations under the License.
> */
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.Set;
> public class StarvedFetchResponseTest {
> public static void main(String[] args) throws InterruptedException {
> long pollTimeout = Long.valueOf(args[0]);
> long sleepDuration = Long.valueOf(args[1]);
> String receiveBufferSize = args[2];
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9090,localhost:9091");
> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "fetch-response-starvation");
> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
> props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
> receiveBufferSize);
> KafkaConsumer<byte[], byte[]> kafkaConsumer = new
> KafkaConsumer<>(props);
> List<TopicPartition> partitions = new ArrayList<>();
> for (int i = 0; i < 2; i++) {
> partitions.add(new TopicPartition("t0", i));
> }
> partitions.add(new TopicPartition("t1", 0));
> kafkaConsumer.assign(partitions);
> kafkaConsumer.seekToBeginning(partitions);
> while (true) {
> ConsumerRecords<byte[], byte[]> records =
> kafkaConsumer.poll(pollTimeout);
> System.out.println(recordsPerTopic(records));
> Thread.sleep(sleepDuration);
> }
> }
> private static Map<TopicPartition, Integer>
> recordsPerTopic(ConsumerRecords<byte[], byte[]> records) {
> Map<TopicPartition, Integer> result = new HashMap<>();
> Set<TopicPartition> partitions = records.partitions();
> for (TopicPartition partition : partitions) {
> if (!result.containsKey(partition)) {
> result.put(partition, 0);
> }
> result.put(partition, result.get(partition) +
> records.records(partition).size());
> }
> return result;
> }
> }
> {code}
> After running it for 30 minutes, around 33 FetchResponses from broker 1 were
> served to the application while the many partially formed FetchResponses from
> broker 0 were cancelled due to a disconnect from request timeout. It seems
> that were was only one successful FetchResponse from broker 0 served to the
> application during this time.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)