Hi team, After reading the source code of AbstractFetcherManager I found out that the usage of num.consumer.fetchers may not match what is described in the Kafka doc. My interpretation of the Kafka doc is that the number of fetcher threads is controlled by the value of property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are 4 fetcher threads in total created after consumer is initialized.
But what I found from the source code tells me a different thing. Below code is copied from AbstractFetcherManager private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some(f) => fetcherThread = f case None => fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => topicAndPartition -> brokerAndInitOffset.initOffset }) } } If I have one topic with one partition and num.consumer.fetchers set to 4 there is actually only one fetcher thread created not 4. num.consumer.fetchers essentially set the max value of number of fetcher threads not the actual number of fetcher threads. The actual number of fetcher threads is controlled by this line of code Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers Is my assumption correct? -- Regards, Tao