mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r418209285



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -1064,14 +1071,16 @@ private void addAllKeys(final Set<Long> allKeys, final 
List<KeyValue<Long, Long>
 
     // must be public to allow KafkaProducer to instantiate it
     public static class KeyPartitioner implements Partitioner {
+        private final static LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
         @Override
         public int partition(final String topic,
                              final Object key,
                              final byte[] keyBytes,
                              final Object value,
                              final byte[] valueBytes,
                              final Cluster cluster) {
-            return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
+            return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % 
NUM_TOPIC_PARTITIONS;

Review comment:
       I don't think so. The original impl (just for the upstream producer to 
write into the input topics) was:
   ```
   return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
   ```
   
   However, this assumes that `key` is of type `Long` what is not true when 
used within streams, because Streams does serialize all data upfront and `key` 
and `value` type is `byte[]` -- thus, we need to deserialize  to get the 
original key object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to