[ https://issues.apache.org/jira/browse/KAFKA-9732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Adam Bellemare resolved KAFKA-9732. ----------------------------------- Resolution: Not A Problem Issue was with reporter's usage of the API. > Kafka Foreign-Key Joiner has unexpected default value used when a table is > created via a stream+groupByKey+reduce > ----------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-9732 > URL: https://issues.apache.org/jira/browse/KAFKA-9732 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.1 > Reporter: Adam Bellemare > Priority: Major > > I'm upgrading some internal business code that used to use a prototype > version of the FKJoiner, migrating to the 2.4.1 Kafka release. I am running > into an issue where the joiner is using the default Serde, despite me clearly > specifying NOT to use the default serde (unless I am missing something!). > Currently, this is how I generate the left KTable, used in the > _*leftTable.join(rightTable, ...)*_ FKJoin. > Let's call this process 1: > {code:scala} > val externalMyKeySerde = ... //Confluent Kafka S.R. serde. > val externalMyValueSerde = ...//Confluend Kafka S.R. value serde > val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde) > //For wrapping nulls in mapValues below > case class OptionalDeletable[T](elem: Option[T]) > //Internal Serdes that do NOT use the SR > //Same serde logic as externalMyKeySerde, but doesn't register schemas to > schema registry. > val internalMyKeySerde = ... > //Same serde logic as externalMyValueSerde, but doesn't register schemas to > schema registry. > val internalOptionalDeletableMyValueSerde: Serde[OptionalDeletable[MyValue]] > = ... > val myLeftTable: KTable[MyKey, MyValue] = > streamBuilder.stream[MyKey, MyValue]("inputTopic")(myConsumer) > .mapValues( > v => { > //We need the nulls to propagate deletes. > //Wrap this in a simple case-class because we can't > groupByKey+reduce null values as they otherwise get filtered out. > OptionalDeletable(Some(v)) > } > ) > .groupByKey(Grouped.`with`(internalMyKeySerde, > internalOptionalDeletableMyValueSerde)) > .reduce((_,x) => x)( > Materialized.as("myLeftTable")(internalMyKeySerde, > internalOptionalDeletableMyValueSerde)) > .mapValues(v => v.elem.get) //Unwrap the element > {code} > Next, we create the right table and specify the FKjoining logic > {code:scala} > //This is created in an identical way to Process 1... I wont show it here for > brevity. > val rightTable: KTable[RightTableKey, RightTableValue] = > streamBuilder.table(...) > //Not showing previous definitions because I don't think they're relevant to > this issue... > val itemMaterialized = > Materialized.as[MyKey, JoinedOutput, KeyValueStore[Bytes, > Array[Byte]]]("materializedOutputTable")( > internalMyKeySerde, internalJoinedOutputSerde) > val joinedTable = myLeftTable.join[JoinedOutput, RightTableKey, > RightTableValue]( > rightTable, foreignKeyExtractor, joinerFunction, > materializedOutputTable) > //Force evaluation to output some data > joinedTable.toStream.to("outputStream") > {code} > When I execute this with leftTable generated via process 1, I end up somehow > losing the leftTable serde along the way and end up falling back onto the > default serde. This results in a runtime exception as follows: > {code:java} > <removed for brevity> > Caused by: java.lang.ClassCastException: com.bellemare.sample.MyValue cannot > be cast to [B > at > org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:94) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) > ... 30 more > {code} > Now, if I change process 1 to the following: > Process 2: > {code:scala} > val externalMyKeySerde = ... //Confluent Kafka S.R. serde. > val externalMyValueSerde = ...//Confluend Kafka S.R. value serde > val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde) > val myLeftTable: KTable[MyKey, MyValue] = > streamBuilder.table[MyKey, MyValue]("inputTopic")(myConsumer) > //The downside of this approach is that we end up registering a bunch of > internal topics to the schema registry (S.R.), significantly increasing the > clutter in our lookup UI. > {code} > Everything works as expected, and the expected `_*externalMyValueSerde*_` is > used to serialize the events (though I don't want this, as it registers to > the SR and clutters it up). > I don't think I'm missing any Serdes inputs anywhere in the DSL, but I'm > having a hard time figuring out *if this is normal existing behaviour for how > a KTable is created via* *Process 1* or if I'm stumbling upon a bug > somewhere. When I try to debug my way through this, the FKJoiner appears to > use `_*valSerde = null*_` (and therefore fall back to the default Serde) for > the KTable created via process 1. This is unexpected to me, I was expected to > see `_*valSerde = internalOptionalDeletableMyValueSerde*_` instead. > Is this a bug, or is this a problem with something that I am doing > unwittingly? -- This message was sent by Atlassian Jira (v8.3.4#803005)