Hi team,
After reading the source code of AbstractFetcherManager I found out that
the usage of num.consumer.fetchers may not match what is described in the
Kafka doc. My interpretation of the Kafka doc is that the number of
fetcher threads is controlled by the value of
property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are
4 fetcher threads in total created after consumer is initialized.
But what I found from the source code tells me a different thing. Below
code is copied from AbstractFetcherManager
private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
}
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
BrokerAndInitialOffset]) {
mapLock synchronized {
val partitionsPerFetcher = partitionAndOffsets.groupBy{
case(topicAndPartition,
brokerAndInitialOffset) =>
BrokerAndFetcherId(brokerAndInitialOffset.broker,
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
for ((brokerAndFetcherId, partitionAndOffsets) <-
partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None =>
fetcherThread =
createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start
}
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
{ case (topicAndPartition, brokerAndInitOffset) =>
topicAndPartition -> brokerAndInitOffset.initOffset
})
}
}
If I have one topic with one partition and num.consumer.fetchers set to 4
there is actually only one fetcher thread created not 4.
num.consumer.fetchers essentially set the max value of number of fetcher
threads not the actual number of fetcher threads. The actual number of
fetcher threads is controlled by this line of code
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
Is my assumption correct?
--
Regards,
Tao