Hi,
I was playing with the new producer in 0.8.2.1 using partition keys ("semantic
partitioning" I believe is the phrase?). I noticed that the default partitioner
in 0.8.2.1 does not partition items the same way as the old 0.8.1.1 default
partitioner was doing. For a test item, the old producer was sending it to
partition 0, whereas the new producer was sending it to partition 4.
Digging in the code, it appears that the partitioning logic is different
between the old and new producers. Both of them hash the key, but they use
different hashing algorithms.
Old partitioner:
./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
New partitioner:
./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
} else {
// hash the key to choose a partition
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
}
Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2 isn't
the same logic as hashCode, especially since hashCode is overrideable).
Was it intentional that the hashing algorithm would change between the old and
new producer? If so, was this documented? I don't know if anyone was relying on
the old default partitioner, as opposed to going round-robin or using their own
custom partitioner. Do you expect it to change in the future? I'm guessing that
one of the main reasons to have a custom hashing algorithm is so that you are
full control of the partitioning and can keep it stable (as opposed to being
reliant on hashCode()).
Thanks,
-James