Hi team,

After reading the source code of AbstractFetcherManager I found out that
the usage of num.consumer.fetchers may not match what is described in the
Kafka doc. My interpretation of the Kafka doc is that  the number of
fetcher threads is controlled by the value of
 property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are
4 fetcher threads in total created after consumer is initialized.

But what I found from the source code tells me a different thing. Below
code is copied from AbstractFetcherManager

private def getFetcherId(topic: String, partitionId: Int) : Int = {

    Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

  }


def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
BrokerAndInitialOffset]) {

    mapLock synchronized {

      val partitionsPerFetcher = partitionAndOffsets.groupBy{
case(topicAndPartition,
brokerAndInitialOffset) =>

        BrokerAndFetcherId(brokerAndInitialOffset.broker,
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}

      for ((brokerAndFetcherId, partitionAndOffsets) <-
partitionsPerFetcher) {

        var fetcherThread: AbstractFetcherThread = null

        fetcherThreadMap.get(brokerAndFetcherId) match {

          case Some(f) => fetcherThread = f

          case None =>

            fetcherThread =
createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)

            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)

            fetcherThread.start

        }



fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
{ case (topicAndPartition, brokerAndInitOffset) =>

          topicAndPartition -> brokerAndInitOffset.initOffset

        })

      }

    }

 If I have one topic with one partition and num.consumer.fetchers set to 4
there is actually only one fetcher thread created not 4.
num.consumer.fetchers essentially set the max value of number of fetcher
threads not the actual number of fetcher threads. The actual number of
fetcher threads is controlled by this line of code
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

Is my assumption correct?

-- 
Regards,
Tao

Reply via email to