[
https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677990#comment-15677990
]
Damian Guy commented on KAFKA-4270:
-----------------------------------
Hi Mykola,
I'm not entirely sure i follow what you are saying. I think that all you need
to do is provide the serdes to the `groupBy` method, i.e.,
`table.groupBy((key, value) -> value.thing, keySerde, valueSerde)`
Thanks,
Damian
> ClassCast for Agregation
> ------------------------
>
> Key: KAFKA-4270
> URL: https://issues.apache.org/jira/browse/KAFKA-4270
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Mykola Polonskyi
> Assignee: Damian Guy
> Priority: Critical
> Labels: architecture
>
> With defined serdes for intermediate topic in aggregation catch the
> ClassCastException: from custom class to the ByteArray.
> In debug I saw that defined serde isn't used for creation sinkNode (incide
> `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`)
> Instead defined serde inside aggregation call is used default Impl with empty
> plugs instead of implementations
> {code:koltin}
> userTable.join(
> skicardsTable.groupBy { key, value ->
> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
> .aggregate(
> { mutableSetOf<SkicardInfo>() },
> { ownerId, skicardInfo, accumulator ->
> accumulator.put(skicardInfo) },
> { ownerId, skicardInfo, accumulator ->
> accumulator },
> skicardByOwnerIdSerde,
> skicardByOwnerIdTopicName
> ),
> { userCreatedOrUpdated, skicardInfoSet ->
> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
> ).to(
> userWithSkicardsTable
> )
> {code}
> I think current behavior of `doAggregate` with serdes and/or stores setting
> up should be changed because that is incorrect in release 0.10.0.1-cp1 to.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)