Hi François,

I agree with you and Svante, so I think my logic is correct. But I really can't 
find why my problem happens and I was stuck here for weeks. I think posting 
some of my codes might be helpful, could you please give the codes a quick 
check?

Here is the producer codes, I didn't use the custom partitioner.class :
  val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else 
NoCompressionCodec.codec

  props.put("compression.codec", codec.toString)
  props.put("producer.type", if(synchronously) "sync" else "async")
  props.put("metadata.broker.list", brokerList)
  props.put("batch.num.messages", batchSize.toString)
  props.put("message.send.max.retries", messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
  
  def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): 
KeyedMessage[AnyRef, AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }
  
  def send(message: String, partition: String = null): Unit = 
send(message.getBytes("UTF8"), if (partition == null) null else 
partition.getBytes("UTF8"))

  def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message, partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }        
  }

And here is how I use the producer, create a producer instance, then use this 
instance to send three message. Currently I create the partition key as 
Integer, then convert it to Byte Arrays:
  val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  var testStatus = false

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic, "localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array() // Create a 
partition key as Byte Array
    producer.send(testMessage.getBytes("UTF8"), key)
  }

I appreciate all the suggestions and help!

Thanks,
Haoming

> From: [email protected]
> Date: Wed, 26 Nov 2014 19:57:13 +0000
> Subject: Re: Partition key not working properly
> To: [email protected]
> 
> Hi haoming,
> 
> As far as I know, svante is right.
> 
> Maybe you modified your default partitioner?
> 
> or are you sure the same key go to different partitions? maybe its just 2
> keys that are going to the same partition?
> 
> Because it's possible that you have something like that
> - key "1" -> partition 3
> - key "2" -> partition 2
> - key "3" -> partition 3
> 
> and nothing in partition 1
> 
> On Wed Nov 26 2014 at 02:35:33 Haoming Zhang <[email protected]>
> wrote:
> 
> > Hi Svante,
> >
> > Thanks for your reply!
> >
> > As you said, my purpose is let "all messages with the same key goes to the
> > same partition", but the actual case is even I hard code the same partition
> > key(let's say the key is "1") for three messages, the messages are still
> > goes to different partitions.
> >
> > Regards,
> > Haoming
> >
> > > Date: Wed, 26 Nov 2014 08:03:04 +0100
> > > Subject: Re: Partition key not working properly
> > > From: [email protected]
> > > To: [email protected]
> > >
> > > By default, the partition key is used for hashing then it's placed in a
> > > partition that has the appropriate hashed keyspace.
> > >
> > > If you have three physical partitions and then give the partition key "5"
> > > it has nothing to do with physical partition 5 (that does not exist) ,
> > > similar to physical: partition = hash("5") mod 3
> > >
> > >
> > > The only guarantee is that all messages with the same key goes to the
> > same
> > > partition. This is useful to make sure that for example all logs from the
> > > same ip goest to the same partition which means that they can be read by
> > > the same producer.
> > >
> > > /svante
> > >
> > >
> > >
> > > 2014-11-26 2:42 GMT+01:00 Haoming Zhang <[email protected]>:
> > >
> > > >
> > > >
> > > >
> > > > Hi all,
> > > >
> > > > I'm struggling with how to use the partition key mechanism properly. My
> > > > logic is set the partition number as 3, then  create three partition
> > keys
> > > > as "0", "1", "2", then use the partition keys to create three
> > KeyedMessage
> > > > such as
> > > > KeyedMessage(topic, "0", message),
> > > > KeyedMessage(topic, "1", message),
> > > > KeyedMessage(topic, "2", message)
> > > >
> > > > After this, creating a producer instance to send out all the
> > KeyedMessage.
> > > >
> > > > I expecting each KeyedMessage should enter to different partitions
> > > > according to the different partition keys, which means
> > > > KeyedMessage(topic, "0", message) go to Partition 0,
> > > > KeyedMessage(topic, "1", message) go to Partition 1,
> > > > KeyedMessage(topic, "2", message) go to Partition 2
> > > >
> > > > I'm using Kafka-web-console to watch the topic status, but the result
> > is
> > > > not like what I'm expecting. KeyedMessage still go to partitions
> > randomly,
> > > > some times two KeyedMessage will enter the same partition even they
> > have
> > > > different partition keys, .
> > > >
> > > > Not sure whether my logic is incorrect or I didn't understand the
> > > > partition key mechanism correctly. Anyone could provides some sample
> > code
> > > > or explanation would be great!
> > > >
> > > > Thanks,
> > > > Haoming
> > > >
> > > >
> >
                                          

Reply via email to