Mykola Polonskyi created KAFKA-4270: ---------------------------------------
Summary: ClassCast for Agregation Key: KAFKA-4270 URL: https://issues.apache.org/jira/browse/KAFKA-4270 Project: Kafka Issue Type: Bug Reporter: Mykola Polonskyi With defined serdes for intermediate topic in aggregation catch the ClassCastException: from custom class to the ByteArray. In debug I saw that defined serde isn't used for creation sinkNode (incide `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`) Instead defined serde inside aggregation call is used default Impl with empty plugs instead of implementations {code:koltin} userTable.join( skicardsTable.groupBy { key, value -> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) } .aggregate( { mutableSetOf<SkicardInfo>() }, { ownerId, skicardInfo, accumulator -> accumulator.put(skicardInfo) }, { ownerId, skicardInfo, accumulator -> accumulator }, skicardByOwnerIdSerde, skicardByOwnerIdTopicName ), { userCreatedOrUpdated, skicardInfoSet -> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) } ).to( userWithSkicardsTable ) {code} I think current behavior of `doAggregate` with serdes and/or stores setting up should be changed because that is incorrect in release 0.10.0.1-cp1 to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)