Mykola Polonskyi created KAFKA-4270:
---------------------------------------

             Summary: ClassCast for Agregation
                 Key: KAFKA-4270
                 URL: https://issues.apache.org/jira/browse/KAFKA-4270
             Project: Kafka
          Issue Type: Bug
            Reporter: Mykola Polonskyi


With defined serdes for intermediate topic in aggregation catch the 
ClassCastException: from custom class to the ByteArray.

In debug I saw that defined serde isn't used for creation sinkNode (incide 
`org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`) 

Instead defined serde inside aggregation call is used default Impl with empty 
plugs instead of implementations 

{code:koltin} 
userTable.join(
            skicardsTable.groupBy { key, value -> 
KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
                    .aggregate(
                            { mutableSetOf<SkicardInfo>() }, 
                            { ownerId, skicardInfo, accumulator -> 
accumulator.put(skicardInfo) },
                            { ownerId, skicardInfo, accumulator -> accumulator 
}, 
                            skicardByOwnerIdSerde,
                            skicardByOwnerIdTopicName
                    ),
            { userCreatedOrUpdated, skicardInfoSet -> 
UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
    ).to(
            userWithSkicardsTable
    )
{code}

I think current behavior of `doAggregate` with serdes and/or stores setting up 
should be changed because that is incorrect in release 0.10.0.1-cp1 to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to