thanks. just checked code below. in the code below, the line that calls
Random.nextInt() seems to be called only *a few times* , and all the rest
of the cases getPartition() is called, the
cached sendPartitionPerTopicCache.get(topic) seems to be called, so
apparently you won't get an even partition distribution ?????
the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7
"./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala" 336
lines --66%--
222,46 73%
private def getPartition(topic: String, key: Any, topicPartitionList:
Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Topic " + topic + "
doesn't exist")
val partition =
if(key == null) {
// If the key is null, we don't really need a partitioner
// So we look up in the send partition cache for the topic to
decide the target partition
val id = sendPartitionPerTopicCache.get(topic)
id match {
case Some(partitionId) =>
// directly return the partitionId without checking
availability of the leader,
// since we want to postpone the failure until the send
operation anyways
partitionId
case None =>
val availablePartitions =
topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any
partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
partitionId
}
} else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id: " +
partition + " for topic " + topic +
"; Valid values are in the inclusive range of [0, " +
(numPartitions-1) + "]")
trace("Assigning message of topic %s and key %s to a selected partition
%d".format(topic, if (key == null) "[none]" else key.toString, partition))
partition
}
On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat <[email protected]>
wrote:
> Probably your keys are getting hashed to only those partitions. I don't
> think anything is wrong here.
> You can check how the default hashPartitioner is used in the code and try
> to do the same for your keys before you send them and check which
> partitions are those going to.
>
> The default hashpartitioner does something like this :
>
> hash(key) % numPartitions.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Mar 2, 2015 at 3:52 PM, Yang <[email protected]> wrote:
>
> > we have 10 partitions for a topic, and omit the explicit partition param
> in
> > the message creation:
> >
> > KeyedMessage<String, String> data = new KeyedMessage<String, String>
> > (mytopic, myMessageContent); // partition key need to be polished
> > producer.send(data);
> >
> >
> >
> > but on average 3--5 of the partitions are empty.
> >
> >
> >
> > what went wrong?
> >
> > thanks
> > Yang
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>