[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169176#comment-17169176 ] Guozhang Wang commented on KAFKA-: -- Another walkaround for now, is to use `KTable#mapValues()` in which we can project out the unwanted fields and then materialize. More specifically, you can write: builder.table("topic", Consumed) // do not use Materialized to enforce materializing the source table .mapValues(..., Materialized) // project out those unwanted fields, and then Materialize with the new serde As a result only one store would be created for the resulted KTable after the mapValues. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797353#comment-16797353 ] Matthias J. Sax commented on KAFKA-: Sound like a fair request – the DSL changelogging mechanism is not part of public API (and custom stores are not well documented in general :() and it would be helpful to design a proper and easy to use public API for the Processor API. If you are interested to work on this, feel free to create a new ticket – note, this change will require a KIP. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796661#comment-16796661 ] Paul Whalen commented on KAFKA-: So I just went through looking at all the relevant interfaces again and I think I stand corrected. Fundamentally, the API I was imagining is just: # Some thing that you can send a key/value to and it will write the appropriate records to an appropriately named changelog topic. # Supplying a callback to restore from a topic when a state store is initialized (I know that exists, though I will admit that one of my colleagues spent a morning trying to accomplish that and failed to find an online example or get anything working) I see {{StoreChangeLogger}} as the solution to 1, and although it is not public, it is obviously small and replicable, and now that I see that {{ProcessorContext}} implements {{RecordCollector.Supplier}} allowing the all-important "hook in" so we can get EoS by using the same consumer. And we can choose an appropriate topic name of course from the public {{ProcessorStateManager.storeChangelogTopic()}} And, I'm sure 2 is perfectly solvable given the right understanding. Thanks for your help! The best kind of new feature is the kind that existed all along! > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796650#comment-16796650 ] Matthias J. Sax commented on KAFKA-: No need to apologize, you did not "hijack" this ticket :) What do you mean by "hooking into the change-logging code already built". What API do you have in mind to reuse existing code? From what I can tell atm, it seems that it would be hard to share existing code, but I might be wrong. If you have a good idea, feel free to create a new ticket to follow up. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796648#comment-16796648 ] Paul Whalen commented on KAFKA-: I had that thought and definitely believe it's possible, but I guess to be more clear, it might be nicer if hooking into the change-logging code already built was easier. That seems like a bit of a challenge, if it is possible. Anyway, sorry to hijack this ticket, it's definitely interesting and useful on its own. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796604#comment-16796604 ] Matthias J. Sax commented on KAFKA-: [~pgwhalen] What you describe might be possible today already. Using the Processor API, users can implement `StateStore` interface using any internal storage engine they like and implement whatever change-logging mechanism they need. You can also expose any interface you like to query a store, by implementing a custom `QueryableStoreType`. The coupling describe in this ticket, is a DSL limitation only. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796519#comment-16796519 ] Paul Whalen commented on KAFKA-: This is a very interesting idea that I am suddenly very excited about, and since my team has a somewhat related problem, I'll phrase it the way we've been thinking of it: we love that key value state stores can be backed up to topics, but in our streams application we want a much richer way of querying data than just by key. In a sense, {{range()}} partly solves this problem because it allows for a different way of querying the store rather then just based on your exact key. But the real win would be a complete decoupling of local state store implementation and how it changelogs to kafka. It wouldn't need to be just key-value with range like RocksDB, but has a fancier on-disk structure that could support efficient querying or indexing of the data many ways (I'm thinking SQLite). It would definitely increase fail-over/restore time, but that would be an acceptable/necessary tradeoff - if you're going to layout the data in a totally different format for querying, obviously you have to pay the cost of that translation. What I'm proposing (completely decoupling local state store implementation and how it changelogs to kafka) is more useful for Processor API users, but it could also provide an API useable at the DSL level to enable what this JIRA is asking for (merely decoupling serdes between local state store and changelog in kafka). > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796490#comment-16796490 ] Patrik Kleindl commented on KAFKA-: --- [~mjsax] In https://issues.apache.org/jira/browse/KAFKA-8037 I implemented the deserialization as part of the global state restore to catch corrupted records. This might be extended to serialize into the store in another format. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749214#comment-16749214 ] Matthias J. Sax commented on KAFKA-: This is an interesting request. Note, that in state restore, Kafka Streams currently only copies the key/value bytes from the changelog topic unmodified into the state store. This change would required to deserialize the data using the topic serde, and serialize them again using the state store serde on restore. Thus, fail-over time might increase. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Trivial > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)