Thanks. This is indeed the reason.
On Mar 2, 2015 4:38 PM, "Christian Csar" <[email protected]> wrote:

> I believe you are seeing the behavior where the random partitioner is
> sticky.
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3ccahwhrrxax5ynimqnacsk7jcggnhjc340y4qbqoqcismm43u...@mail.gmail.com%3E
> has details. So with the default 10 minute refresh if your test is only an
> hour or two with a single producer you would not expect to see all
> partitions be hit.
>
> Christian
>
> On Mon, Mar 2, 2015 at 4:23 PM, Yang <[email protected]> wrote:
>
> > thanks. just checked code below. in the code below, the line that calls
> > Random.nextInt() seems to be called only *a few times* , and all the rest
> > of the cases getPartition() is called, the
> > cached sendPartitionPerTopicCache.get(topic) seems to be called, so
> > apparently you won't get an even partition distribution ?????
> >
> > the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7
> >
> >
> > "./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala"
> 336
> > lines --66%--
> >                                     222,46        73%
> >
> >
> >   private def getPartition(topic: String, key: Any, topicPartitionList:
> > Seq[PartitionAndLeader]): Int = {
> >     val numPartitions = topicPartitionList.size
> >     if(numPartitions <= 0)
> >       throw new UnknownTopicOrPartitionException("Topic " + topic + "
> > doesn't exist")
> >     val partition =
> >       if(key == null) {
> >         // If the key is null, we don't really need a partitioner
> >         // So we look up in the send partition cache for the topic to
> > decide the target partition
> >         val id = sendPartitionPerTopicCache.get(topic)
> >         id match {
> >           case Some(partitionId) =>
> >             // directly return the partitionId without checking
> > availability of the leader,
> >             // since we want to postpone the failure until the send
> > operation anyways
> >             partitionId
> >           case None =>
> >             val availablePartitions =
> > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> >             if (availablePartitions.isEmpty)
> >               throw new LeaderNotAvailableException("No leader for any
> > partition in topic " + topic)
> >             val index = Utils.abs(Random.nextInt) %
> > availablePartitions.size
> >             val partitionId = availablePartitions(index).partitionId
> >             sendPartitionPerTopicCache.put(topic, partitionId)
> >             partitionId
> >         }
> >       } else
> >         partitioner.partition(key, numPartitions)
> >     if(partition < 0 || partition >= numPartitions)
> >       throw new UnknownTopicOrPartitionException("Invalid partition id:
> " +
> > partition + " for topic " + topic +
> >         "; Valid values are in the inclusive range of [0, " +
> > (numPartitions-1) + "]")
> >     trace("Assigning message of topic %s and key %s to a selected
> partition
> > %d".format(topic, if (key == null) "[none]" else key.toString,
> partition))
> >     partition
> >   }
> >
> >
> > On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat <
> > [email protected]>
> > wrote:
> >
> > > Probably your keys are getting hashed to only those partitions. I don't
> > > think anything is wrong here.
> > > You can check how the default hashPartitioner is used in the code and
> try
> > > to do the same for your keys before you send them and check which
> > > partitions are those going to.
> > >
> > > The default hashpartitioner does something like this :
> > >
> > > hash(key) % numPartitions.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Mon, Mar 2, 2015 at 3:52 PM, Yang <[email protected]> wrote:
> > >
> > > > we have 10 partitions for a topic, and omit the explicit partition
> > param
> > > in
> > > > the message creation:
> > > >
> > > > KeyedMessage<String, String> data = new KeyedMessage<String, String>
> > > > (mytopic,   myMessageContent);   // partition key need to be polished
> > > > producer.send(data);
> > > >
> > > >
> > > >
> > > > but on average 3--5 of the partitions are empty.
> > > >
> > > >
> > > >
> > > > what went wrong?
> > > >
> > > > thanks
> > > > Yang
> > > >
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>

Reply via email to