Adam Bellemare created KAFKA-9732:
-------------------------------------

             Summary: Kafka Foreign-Key Joiner has unexpected default value 
used when a table is created via a stream+groupByKey+reduce
                 Key: KAFKA-9732
                 URL: https://issues.apache.org/jira/browse/KAFKA-9732
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.4.1, 2.4.0
            Reporter: Adam Bellemare


I'm upgrading some internal business code that used to use a prototype version 
of the FKJoiner, migrating to the 2.4.1 Kafka release. I am running into an 
issue where the joiner is using the default Serde, despite me clearly 
specifying NOT to use the default serde (unless I am missing something!). 
Currently, this is how I generate the left KTable, used in the 
_*leftTable.join(rightTable, ...)*_ FKJoin.

Let's call this process 1:
{code:scala}
val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)

//For wrapping nulls in mapValues below
case class OptionalDeletable[T](elem: Option[T])

//Internal Serdes that do NOT use the SR
//Same serde logic as externalMyKeySerde, but doesn't register schemas to 
schema registry.
val internalMyKeySerde = ... 
//Same serde logic as externalMyValueSerde, but doesn't register schemas to 
schema registry.
val internalOptionalDeletableMyValueSerde: Serde[OptionalDeletable[MyValue]] = 
... 

val myLeftTable: KTable[MyKey, MyValue] =
      streamBuilder.stream[MyKey, MyValue]("inputTopic")(myConsumer)
        .mapValues(
          v => {
            //We need the nulls to propagate deletes.
            //Wrap this in a simple case-class because we can't 
groupByKey+reduce null values as they otherwise get filtered out. 
            OptionalDeletable(Some(v))
          }
        )
        .groupByKey(Grouped.`with`(internalMyKeySerde, 
internalOptionalDeletableMyValueSerde))
        .reduce((_,x) => x)(
            Materialized.as("myLeftTable")(internalMyKeySerde, 
internalOptionalDeletableMyValueSerde))
        .mapValues(v => v.elem.get) //Unwrap the element
{code}

Next, we create the right table and specify the FKjoining logic
{code:scala}
//This is created in an identical way to Process 1... I wont show it here for 
brevity.
val rightTable: KTable[RightTableKey, RightTableValue] = 
streamBuilder.table(...)

//Not showing previous definitions because I don't think they're relevant to 
this issue...
val itemMaterialized =
    Materialized.as[MyKey, JoinedOutput, KeyValueStore[Bytes, 
Array[Byte]]]("materializedOutputTable")(
      internalMyKeySerde, internalJoinedOutputSerde)

val joinedTable = myLeftTable.join[JoinedOutput, RightTableKey, 
RightTableValue](
      rightTable, foreignKeyExtractor, joinerFunction, materializedOutputTable)

//Force evaluation to output some data
joinedTable.toStream.to("outputStream")
{code}

When I execute this with leftTable generated via process 1, I end up somehow 
losing the leftTable serde along the way and end up falling back onto the 
default serde. This results in a runtime exception as follows:
{code:java}
<removed for brevity>
Caused by: java.lang.ClassCastException: com.bellemare.sample.MyValue cannot be 
cast to [B
        at 
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:94)
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
        ... 30 more
{code}
Now, if I change process 1 to the following:

Process 2:
{code:scala}
val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)

val myLeftTable: KTable[MyKey, MyValue] =
      streamBuilder.table[MyKey, MyValue]("inputTopic")(myConsumer)
//The downside of this approach is that we end up registering a bunch of 
internal topics to the schema registry (S.R.), significantly increasing the 
clutter in our lookup UI.
{code}
Everything works as expected, and the expected `_*externalMyValueSerde*_` is 
used to serialize the events (though I don't want this, as it registers to the 
SR and clutters it up).

I don't think I'm missing any Serdes inputs anywhere in the DSL, but I'm having 
a hard time figuring out *if this is normal existing behaviour for how a KTable 
is created via* *Process 1* or if I'm stumbling upon a bug somewhere. When I 
try to debug my way through this, the FKJoiner appears to use `_*valSerde = 
null*_` (and therefore fall back to the default Serde) for the KTable created 
via process 1. This is unexpected to me, I was expected to see `_*valSerde = 
internalOptionalDeletableMyValueSerde*_` instead.

Is this a bug, or is this a problem with something that I am doing unwittingly?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to