mjsax commented on a change in pull request #8496: URL: https://github.com/apache/kafka/pull/8496#discussion_r418209285
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -1064,14 +1071,16 @@ private void addAllKeys(final Set<Long> allKeys, final List<KeyValue<Long, Long> // must be public to allow KafkaProducer to instantiate it public static class KeyPartitioner implements Partitioner { + private final static LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + @Override public int partition(final String topic, final Object key, final byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster cluster) { - return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS; + return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % NUM_TOPIC_PARTITIONS; Review comment: I don't think so. The original impl (just for the upstream producer to write into the input topics) was: ``` return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS; ``` However, this assumes that `key` is of type `Long` what is not true when used within streams, because Streams does serialize all data upfront and `key` and `value` type is `byte[]` -- thus, we need to deserialize to get the original key object. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org