Voronenko Dmitriy created SAMZA-1866:
----------------------------------------

             Summary: Invalid partition calculation in KafkaSystemProducer
                 Key: SAMZA-1866
                 URL: https://issues.apache.org/jira/browse/SAMZA-1866
             Project: Samza
          Issue Type: Bug
          Components: kafka
    Affects Versions: 0.14.1
            Reporter: Voronenko Dmitriy


If you use a byte array as the key, the key.hashCode() will be different all 
the time. Proposal to remove the preliminary calculation of the partition.

https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala

val partitionKey = if (envelope.getPartitionKey != null) 
KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null

https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala

def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: 
java.util.List[PartitionInfo]): Integer = {
val numPartitions = partitions.size
abs(envelope.getPartitionKey.hashCode()) % numPartitions
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to