[ 
https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860815#comment-15860815
 ] 

Jason Gustafson commented on KAFKA-4753:
----------------------------------------

Could scenario 1 be mitigated with a new partition assignor which took into 
account partition leadership?

> KafkaConsumer susceptible to FetchResponse starvation
> -----------------------------------------------------
>
>                 Key: KAFKA-4753
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4753
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Onur Karaman
>            Assignee: Onur Karaman
>
> FetchResponse starvation here means that the KafkaConsumer repeatedly fails 
> to fully form FetchResponses within the request timeout from a subset of the 
> brokers its fetching from while FetchResponses from the other brokers can get 
> fully formed and processed by the application.
> In other words, this ticket is concerned with scenarios where fetching from 
> some brokers hurts the progress of fetching from other brokers to the point 
> of repeatedly hitting a request timeout.
> Some FetchResponse starvation scenarios:
> 1. partition leadership of the consumer's assigned partitions is skewed 
> across brokers, causing uneven FetchResponse sizes across brokers.
> 2. the consumer seeks back on partitions on some brokers but not others, 
> causing uneven FetchResponse sizes across brokers.
> 3. the consumer's ability to keep up with various partitions across brokers 
> is skewed, causing uneven FetchResponse sizes across brokers.
> I've personally seen scenario 1 happen this past week to one of our users in 
> prod. They manually assigned partitions such that a few brokers led most of 
> the partitions while other brokers only led a single partition. When 
> NetworkClient sends out FetchRequests to different brokers in parallel with 
> an uneven partition distribution, FetchResponses from brokers who lead more 
> partitions will contain more data than FetchResponses from brokers who lead 
> few partitions. This means the small FetchResponses will get fully formed 
> quicker than larger FetchResponses. When the application eventually consumes 
> a smaller fully formed FetchResponses, the NetworkClient will send out a new 
> FetchRequest to the lightly-loaded broker. Their response will again come 
> back quickly while only marginal progress has been made on the larger 
> FetchResponse. Repeat this process several times and your application will 
> have potentially processed many smaller FetchResponses while the larger 
> FetchResponse made minimal progress and is forced to timeout, causing the 
> large FetchResponse to start all over again, which causes starvation.
> To mitigate the problem for the short term, I've suggested to our user that 
> they either:
> 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB 
> to something like 1 MB. This is the solution I short-term solution I 
> suggested they go with.
> 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 
> MB to something like 100 KB. This solution wasn't advised as it could impact 
> broker performance.
> 3. ask our SRE's to run a partition reassignment to balance out the partition 
> leadership (partitions were already being led by their preferred leaders).
> 4. bump up their request timeout. It was set to open-source's former default 
> of 40 seconds.
> Contributing factors:
> 1. uneven FetchResponse sizes across brokers.
> 2. processing time of the polled ConsumerRecords.
> 3. "max.poll.records" increases the number of polls needed to consume a 
> FetchResponse, making constant-time overhead per poll magnified.
> 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to 
> ConsumerNetworkClient.poll.
> 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and 
> ConsumerNetworkClient.poll can return before the poll timeout as soon as a 
> single channel is selected.
> 6. NetworkClient.poll is solely driven by the user thread with manual 
> partition assignment.
> So far I've only locally reproduced starvation scenario 1 and haven't even 
> attempted the other scenarios. Preventing the bypass of 
> ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but 
> it seems starvation would still be possible.
> How to reproduce starvation scenario 1:
> 1. startup zookeeper
> 2. startup two brokers
> 3. create a topic t0 with two partitions led by broker 0 and create a topic 
> t1 with a single partition led by broker 1
> {code}
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> > --replica-assignment 0,0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> > --replica-assignment 1
> {code}
> 4. Produce a lot of data into these topics
> {code}
> > ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000 
> > --record-size 100 --throughput 100000 --producer-props 
> > bootstrap.servers=localhost:9090,localhost:9091
> > ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000 
> > --record-size 100 --throughput 100000 --producer-props 
> > bootstrap.servers=localhost:9090,localhost:9091
> {code}
> 5. startup a consumer that consumes these 3 partitions with TRACE level 
> NetworkClient logging
> {code}
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.StarvedFetchResponseTest 10000 3000 65536
> {code}
> The config/tools-log4j.properties:
> {code}
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> log4j.rootLogger=WARN, stderr
> log4j.appender.stderr=org.apache.log4j.ConsoleAppender
> log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
> log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
> log4j.appender.stderr.Target=System.err
> log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr
> log4j.additivity.org.apache.kafka.clients.NetworkClient=false
> {code}
> The consumer code:
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one or more 
> contributor license agreements. See the NOTICE
>  * file distributed with this work for additional information regarding 
> copyright ownership. The ASF licenses this file
>  * to You under the Apache License, Version 2.0 (the "License"); you may not 
> use this file except in compliance with the
>  * License. You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software 
> distributed under the License is distributed on
>  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
> express or implied. See the License for the
>  * specific language governing permissions and limitations under the License.
>  */
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.Set;
> public class StarvedFetchResponseTest {
>     public static void main(String[] args) throws InterruptedException {
>         long pollTimeout = Long.valueOf(args[0]);
>         long sleepDuration = Long.valueOf(args[1]);
>         String receiveBufferSize = args[2];
>         Properties props = new Properties();
>         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090,localhost:9091");
>         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
> "fetch-response-starvation");
>         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
>         props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
>         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>         props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
> receiveBufferSize);
>         KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
> KafkaConsumer<>(props);
>         List<TopicPartition> partitions = new ArrayList<>();
>         for (int i = 0; i < 2; i++) {
>             partitions.add(new TopicPartition("t0", i));
>         }
>         partitions.add(new TopicPartition("t1", 0));
>         kafkaConsumer.assign(partitions);
>         kafkaConsumer.seekToBeginning(partitions);
>         while (true) {
>             ConsumerRecords<byte[], byte[]> records = 
> kafkaConsumer.poll(pollTimeout);
>             System.out.println(recordsPerTopic(records));
>             Thread.sleep(sleepDuration);
>         }
>     }
>     private static Map<TopicPartition, Integer> 
> recordsPerTopic(ConsumerRecords<byte[], byte[]> records) {
>         Map<TopicPartition, Integer> result = new HashMap<>();
>         Set<TopicPartition> partitions = records.partitions();
>         for (TopicPartition partition : partitions) {
>             if (!result.containsKey(partition)) {
>                 result.put(partition, 0);
>             }
>             result.put(partition, result.get(partition) + 
> records.records(partition).size());
>         }
>         return result;
>     }
> }
> {code}
> After running it for 30 minutes, around 33 FetchResponses from broker 1 were 
> served to the application while the many partially formed FetchResponses from 
> broker 0 were cancelled due to a disconnect from request timeout. It seems 
> that were was only one successful FetchResponse from broker 0 served to the 
> application during this time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to