[ 
https://issues.apache.org/jira/browse/SAMZA-1866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617886#comment-16617886
 ] 

Dong Lin commented on SAMZA-1866:
---------------------------------

Hey [~voronenko-da], thanks for reporting this problem. The short term solution 
is to use constructor `OutgoingMessageEnvelope(SystemStream systemStream, 
Object partitionKey, Object key , Object message)` and set `partitionKey=null`. 
Then the `key` will be passed to the KafkaProducer and the partition algorithm 
is determined by the KafkaProducer.

Note that this may not work if you are producing to a compacted Kafka topic and 
the key is currently different from the partitionKey in 
OutgoingMessageEnvelope. It is probably a rare case. Can you confirm that this 
address your use-case for now?

This short term solution is not very nice and the current API is confusing. We 
will come up with a long term solution as Samza Improvement Proposal and in 
SAMZA-839.

> Invalid partition calculation in KafkaSystemProducer
> ----------------------------------------------------
>
>                 Key: SAMZA-1866
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1866
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.14.1
>            Reporter: Voronenko Dmitriy
>            Priority: Major
>
> If you use a byte array as the key, the key.hashCode() will be different all 
> the time. Proposal to remove the preliminary calculation of the partition.
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
> val partitionKey = if (envelope.getPartitionKey != null) 
> KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
> def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: 
> java.util.List[PartitionInfo]): Integer = {
> val numPartitions = partitions.size
> abs(envelope.getPartitionKey.hashCode()) % numPartitions
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to