[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701345#comment-16701345 ] Guozhang Wang commented on KAFKA-7653: -- I think [~mark.tranter] is trying to provide a work-around if we cannot auto configure the same serde class' instance with key turned on / off. If that is possible, then I feel providing two instances of a single class of serde whose which will be configured differently for key and value would be ideal; if this cannot be achieved, then we need to consider using two classes, and passing in the instance of each class respectively for key and value. Note that https://issues.apache.org/jira/browse/KAFKA-3729 was filed before we have KIP-182; now with key / value serdes wrapped in Produced / Consumed / Grouped etc, we know exactly which serde is used for key / value, and hence should be able to auto-configure them accordingly inside the constructor of those control objects. Then with that this ticket should be easy to resolve, does that make sense? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699695#comment-16699695 ] Matthias J. Sax commented on KAFKA-7653: Not sure, what the use case for this would be? Why would you want to use two different Serdes for key and value if it's the same data type? Also, even with different key/value types (for same actually data type) to distinguish the serdes, it would still not allow you to to {code:java} implicit val keySerde1: Serde[String] = new SerdeImplA[String](... ) implicit val keySerde2: Serde[String] = new SerdeImplB[String](...) builder.stream[String, Integer]("my-topic") // use SerdeA here builder.stream[String, Integer]("my-topic") // use SerdeB here{code} If there are two Serdes for the same data type, it's ambiguous, and user need to pass in Serdes explicitly. This seems to ok. Why do you think, that the key/value case is different to the example from above? To me, it seems to be the same and thus, I don't see a reason why we would need to distinguish between key and value in your example? Thoughts? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699614#comment-16699614 ] Mark Tranter commented on KAFKA-7653: - Thanks [~vvcephei], Different serde instances for keys/values is exactly what we want. The problem is that with the current Scala API, there is no way for the compiler to garauntee different implicit instances for the same K/V types. i.e. {code} class KStreamBuilder { def stream[K,V](topic: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]) ... }{code} If `K` and `V` are the same type, the compiler will use the same implicit definition. We cant guarantee that this definition will return different instances which makes auto-config problematic. I can also imagine a scenario like this... {code} implicit val keySerde: Serde[String] = new SerdeImplA[String](... ) implicit val valueSerde: Serde[String] = new SerdeImplB[String](...) // Compilation error. Ambiguous Implicit definition. builder.stream[String, String]("my-topic") {code} Currently there would be no way of doing this with the existing scala API (apart from explicit passing of serdes. Which negates the need for the Scala API). The only way around this - that I can see - is to provide K/V serde differentiation at the type level. > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699383#comment-16699383 ] John Roesler commented on KAFKA-7653: - Hi [~mark.tranter], I haven't looked at the PR yet, but just one comment from the discussion here... It's a sharp edge, but AFAIK users *must* provide different serde instances for keys/values. I recently submitted a bugfix for the scala API to make the provided implicit serdes generate a new instance each time, as this was overlooked initially. I'm wondering if, in addition to what you proposed above, we should consider verifying that the serdes are properly configured, possibly in the Scala API, but possibly more usefully in the Java API. Thoughts? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698519#comment-16698519 ] Matthias J. Sax commented on KAFKA-7653: Btw: I just rediscovered: https://issues.apache.org/jira/browse/KAFKA-3729 > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698503#comment-16698503 ] Matthias J. Sax commented on KAFKA-7653: If an implicit can be written in a way such that it returns a new object each time, it might be fair to document this and make it a requirement. For the second part: the compiler does not need to distinguish both cases IMHO. At runtime, we get two Serde-objects, one for the key, and one for the value, and we can call `configure()` correctly for each. Or do I miss something? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698355#comment-16698355 ] Mark Tranter commented on KAFKA-7653: - {quote}Are you saying that implicits reuse the same object and don't create a new object each time they are used {quote} Yes. Well.. It depends how the implicits are defined. However there is no compile time way of enforcing that client define their implicits such that new instances are returned each time. > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16696405#comment-16696405 ] Matthias J. Sax commented on KAFKA-7653: {quote}AFAIK there is no way of providing two implicit instances of the same type. {quote} I am not a Scala person. Are you saying that implicits reuse the same object and don't create a new object each time they are used? This would indeed break the code (and make my idea no suitable). {quote}Also how would we now what config values to pass to configure() as the Map in the auto-config? {quote} We would pass in the parameters provided to `KafkaStreams` instance. {quote}I could also imagine use cases where a user may want different serialisation formats for keys and values i.e. Binary Keys and Json Values. {quote} Wouldn't this be different Serdes? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695566#comment-16695566 ] Mark Tranter commented on KAFKA-7653: - Some type of auto-config would be ideal. But I'm not sure how this would work. {code} val streams = builder.stream[String, String]("some-topic"){code} In this example we need two implicit serde instances (one to be 'auto configured' with `isKey = true` and one `isKey = false`). AFAIK there is no way of providing two implicit instances of the same type. Also how would we now what config values to pass to configure() as the Map in the auto-config? I could also imagine use cases where a user may want different serialisation formats for keys and values i.e. Binary Keys and Json Values. Have I mis-understood? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695446#comment-16695446 ] Matthias J. Sax commented on KAFKA-7653: I want to share another thought about this (brought up by [~miguno] in a discussion we had). The issue, that user-provided serdes on operators need to be configure() manually by the user before they are handed into the operator, also exists for the Java API. It's less problematic in Java because there are no explicits. However, IMHO there is actually not reason why Streams could not auto-config all provided serdes (not just the default ones from the config). If we would do this, this would implicitly fix this ticket (as it would apply to the shared runtime) and we don't need any API change. Thoughts? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695375#comment-16695375 ] ASF GitHub Bot commented on KAFKA-7653: --- mtranter opened a new pull request #5938: KAFKA-7653 [WIP] Add KeySerde and ValueSerde types URL: https://github.com/apache/kafka/pull/5938 ## KAFKA-7653 https://issues.apache.org/jira/browse/KAFKA-7653 Added two new types `KeySerde[T]` and `ValueSerde[T]` and changed signatures of the relevant methods to accept `KeySerde` and `ValueSerde` instead of `Serde` as implicit params. Also added `KeyValueAgnostic` object to `org.apache.kafka.streams.scala.Serdes`, to provide implicit conversions from `Key/ValueSerdes` to plain old `Serde` to help minimise impact on existing users ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [X] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) @guozhangwang @miguno @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695312#comment-16695312 ] Mark Tranter commented on KAFKA-7653: - Thats great guys thanks. I'll submit a WIP PR shortly to give us something to talk about. I agree with [~vvcephei] that it will be hard to achieve full backwards compatibility but lets see what we can do! > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694382#comment-16694382 ] Michael Noll commented on KAFKA-7653: - For [~mark.tranter]: If this change does require a KIP (Kafka Improvement Proposal), we can help shepherd the KIP logistics. Just want to make sure that you know that a KIP is nothing daunting. :-) > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694085#comment-16694085 ] John Roesler commented on KAFKA-7653: - Hey all, I'll jump in, too, if you don't mind... Yeah, I think this change will need a KIP for sure. However, given our recent experience with API changes to the scala interface, it's a little hard to discuss changes like this in the abstract, so I think we should start with a PR, agree on the approach, and then proceed with a KIP to formalize/ratify it, and then finalize the PR. This way, everyone can download the changeset and play around with the different possibilities. Of course, we should seek a solution that doesn't break anyone, but as we've seen in the past, this may be an unattainable goal, and we might have to back off to not breaking anyone who uses implicits with a wildcard import. Still, the first preference is full compatibility. Thanks, -John > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > Labels: scala > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693919#comment-16693919 ] Matthias J. Sax commented on KAFKA-7653: [~guozhang] Do you think this change should be backed by a KIP, too? Seems to be a public API change. > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Assignee: Mark Tranter >Priority: Minor > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693848#comment-16693848 ] Guozhang Wang commented on KAFKA-7653: -- [~mark.tranter] thanks for elaborating the issue! As for how to fix it, I also agree with [~miguno] maintaining compatibility so that we would not break existing users would be the top priority. Maybe we can continue our discussion on what's the best solution on a PR so that we can have more concrete understanding on the pros and cons? I've added you as the contributor of this PR and please feel free to follow the guidance to submit one: https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Priority: Minor > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693645#comment-16693645 ] Michael Noll commented on KAFKA-7653: - Ack, [~mark.tranter]! {quote}Something Like KeySerde[T] and ValueSerde[T] and updating the Kafka Scala Streams code to accept these as implicit dependencies in place of Serde[T].{quote} Yeah, this seems certainly like a possible option -- it was also the first one that came to my mind, for what it's worth. The Scala type system has probably a few more options we could evaluate here. Perhaps there is a way we could introduce types for this in a way that (1) is easier in terms of backwards compatibility and, orthogonal, (2) also convenient to use as well as easy to understand for Scala users that are not experts of Scala's type system, e.g. when writing your own custom serde with the Scala API. {quote}I'm more than happy to help out with a fix for this if we can decide on an appropriate abstraction to use.{quote} That would be great, Mark. Let's continue the conversation here, cc [~guozhang]. > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Priority: Minor > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692440#comment-16692440 ] Mark Tranter commented on KAFKA-7653: - Thanks [~miguno] Yes that is one workaround. And as you say, negates the use of implicit serdes. Another option might be to wrap the types to be serialized {code:java} case class StringKey(k: String) case class Value(v: String){code} Then use these types within the Streams Builder and write basic wrapper serializers for these wrapper types. Again not ideal. I'm more than happy to help out with a fix for this if we can decide on an appropriate abstraction to use. I cant see a nice way of doing it currently that wouldn't cause breaking changes to existing users. Thanks for your reply! > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Priority: Minor > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692148#comment-16692148 ] Michael Noll commented on KAFKA-7653: - Thanks for raising this, [~mark.tranter]! The issue makes sense to me. Question for clarification: The workaround today would be to explicitly specify the serdes you want (this way one can differentiate between key serdes vs. value serdes), but this would by definition negate the convenience in the Scala API where you normally do not need to do that (because of implicit serdes). Right? > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Priority: Minor > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691134#comment-16691134 ] Mark Tranter commented on KAFKA-7653: - Hi [~guozhang], thanks for your reply. The issue is that Kafka Serializers/Deserializers can be configured either as Key or Value serializers: [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java#L40] This abstraction has been replicated with Kafka Streams Serdes. Unfortunately this is not represented at all in the type system. This makes it impossible to provide separate "key" and "value" serializers of the same type in the same implicit scope. Consider the following example using Kafka Streams Scala... {code:java} var keySerde: Serde[String] = _ var valueSerde: Serde[String] = _ keySerde.configure(Map[String,_]().asJava, true) valueSerde.configure(Map[String,_]().asJava, false) implicit val keyS = keySerde implicit val valS = valueSerde builder.stream[String, String]("my-topic"){code} This causes an ambiguous implicit resolution of type Serde[String] We could solve this problem with two new types. Something Like KeySerde[T] and ValueSerde[T] and updating the Kafka Scala Streams code to accept these as implicit dependencies in place of Serde[T]. I have done a very quick POC to demonstrate over here: [https://github.com/mtranter/kafka/compare/trunk...key-value-serde-types] To provide some context: My specific problem is that we are using Confluent Schema Registry. We require our serializers to know whether or not they are serializing keys or values such that they can register Avro schemas with the schema registry. Schemas are registered in the format `\{topic}-[key|value]`.. Hope this makes sense. > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Priority: Minor > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.
[ https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691127#comment-16691127 ] Guozhang Wang commented on KAFKA-7653: -- [~mark.tranter] I'm not sure I fully understand the issue you reported here. Could you maybe send a PR file showing what you want to change on the implicit conversion? cc [~vvcephei] > Streams-Scala: Add type level differentiation for Key and Value serdes. > --- > > Key: KAFKA-7653 > URL: https://issues.apache.org/jira/browse/KAFKA-7653 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mark Tranter >Priority: Minor > > Implicit resolution/conversion of Serdes/Consumed etc is a big improvement > for the Scala Streams API. However in cases where a user needs to > differentiate between Key and Value serializer functionality (i.e. using the > Schema Registry), implicit resolution doesn't help and could cause issues. > e.g. > {code:java} > case class MouseClickEvent(pageId: Long, userId: String) > builder > // Long serde taken from implicit scope configured with > // `isKey` = true > .stream[Long, MouseClickEvent]("mouse-clicks") > .selectKey((_,v) => v.userId) > .groupByKey > .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => > count + 1) > .toStream > // Same Long serde taken from implicit scope configured with > // `isKey` = true, even thought the `Long` value in this case > // will be the Value > .to("mouse-clicks-by-user") > {code} > It would be ideal if Key and Value Serde/SerdeWrapper types/type classes > could be introduced to overcome this limitation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)