[ 
https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691134#comment-16691134
 ] 

Mark Tranter commented on KAFKA-7653:
-------------------------------------

Hi [~guozhang], thanks for your reply.

 

The issue is that Kafka Serializers/Deserializers can be configured either as 
Key or Value serializers: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java#L40]

This abstraction has been replicated with Kafka Streams Serdes.

Unfortunately this is not represented at all in the type system. This makes it 
impossible to provide separate "key" and "value" serializers of the same type 
in the same implicit scope.

Consider the following example using Kafka Streams Scala...

 
{code:java}

var keySerde: Serde[String] = _
var valueSerde: Serde[String] = _

keySerde.configure(Map[String,_]().asJava, true)
valueSerde.configure(Map[String,_]().asJava, false)

implicit val keyS = keySerde
implicit val valS = valueSerde

builder.stream[String, String]("my-topic"){code}
 

This causes an ambiguous implicit resolution of type Serde[String]

We could solve this problem with two new types. Something Like KeySerde[T] and 
ValueSerde[T] and updating the Kafka Scala Streams code to accept these as 
implicit dependencies in place of Serde[T].

I have done a very quick POC to demonstrate over here: 
[https://github.com/mtranter/kafka/compare/trunk...key-value-serde-types]

To provide some context: My specific problem is that we are using Confluent 
Schema Registry. We require our serializers to know whether or not they are 
serializing keys or values such that they can register Avro schemas with the 
schema registry. Schemas are registered in the format `\{topic}-[key|value]`..

 

Hope this makes sense.

 

 

 

> Streams-Scala: Add type level differentiation for Key and Value serdes.
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-7653
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7653
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Mark Tranter
>            Priority: Minor
>
> Implicit resolution/conversion of Serdes/Consumed etc is a big improvement 
> for the Scala Streams API. However in cases where a user needs to 
> differentiate between Key and Value serializer functionality (i.e. using the 
> Schema Registry), implicit resolution doesn't help and could cause issues. 
> e.g.
> {code:java}
> case class MouseClickEvent(pageId: Long, userId: String)
> builder
>   // Long serde taken from implicit scope configured with
>   // `isKey` = true
>   .stream[Long, MouseClickEvent]("mouse-clicks")
>   .selectKey((_,v) => v.userId)
>   .groupByKey
>   .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => 
> count + 1)
>   .toStream
>   // Same Long serde taken from implicit scope configured with
>   // `isKey` = true, even thought the `Long` value in this case
>   // will be the Value
>   .to("mouse-clicks-by-user")
> {code}
> It would be ideal if Key and Value Serde/SerdeWrapper types/type classes 
> could be introduced to overcome this limitation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to