[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2020-07-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169176#comment-17169176
 ] 

Guozhang Wang commented on KAFKA-:
--

Another walkaround for now, is to use `KTable#mapValues()` in which we can 
project out the unwanted fields and then materialize. More specifically, you 
can write:

builder.table("topic", Consumed)   // do not use Materialized to 
enforce materializing the source table
   .mapValues(..., Materialized)   // project out those unwanted 
fields, and then Materialize with the new serde
   
As a result only one store would be created for the resulted KTable after the 
mapValues.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-03-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797353#comment-16797353
 ] 

Matthias J. Sax commented on KAFKA-:


Sound like a fair request – the DSL changelogging mechanism is not part of 
public API (and custom stores are not well documented in general :() and it 
would be helpful to design a proper and easy to use public API for the 
Processor API. If you are interested to work on this, feel free to create a new 
ticket – note, this change will require a KIP.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-03-19 Thread Paul Whalen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796661#comment-16796661
 ] 

Paul Whalen commented on KAFKA-:


So I just went through looking at all the relevant interfaces again and I think 
I stand corrected.  Fundamentally, the API I was imagining is just:

# Some thing that you can send a key/value to and it will write the appropriate 
records to an appropriately named changelog topic.
# Supplying a callback to restore from a topic when a state store is 
initialized (I know that exists, though I will admit that one of my colleagues 
spent a morning trying to accomplish that and failed to find an online example 
or get anything working)

I see {{StoreChangeLogger}} as the solution to 1, and although it is not 
public, it is obviously small and replicable, and now that I see that 
{{ProcessorContext}} implements {{RecordCollector.Supplier}} allowing the 
all-important "hook in" so we can get EoS by using the same consumer.  And we 
can choose an appropriate topic name of course from the public 
{{ProcessorStateManager.storeChangelogTopic()}}

And, I'm sure 2 is perfectly solvable given the right understanding.

Thanks for your help!  The best kind of new feature is the kind that existed 
all along!

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-03-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796650#comment-16796650
 ] 

Matthias J. Sax commented on KAFKA-:


No need to apologize, you did not "hijack" this ticket :)

What do you mean by "hooking into the change-logging code already built". What 
API do you have in mind to reuse existing code? From what I can tell atm, it 
seems that it would be hard to share existing code, but I might be wrong. If 
you have a good idea, feel free to create a new ticket to follow up.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-03-19 Thread Paul Whalen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796648#comment-16796648
 ] 

Paul Whalen commented on KAFKA-:


I had that thought and definitely believe it's possible, but I guess to be more 
clear, it might be nicer if hooking into the change-logging code already built 
was easier.  That seems like a bit of a challenge, if it is possible.  Anyway, 
sorry to hijack this ticket, it's definitely interesting and useful on its own.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-03-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796604#comment-16796604
 ] 

Matthias J. Sax commented on KAFKA-:


[~pgwhalen] What you describe might be possible today already. Using the 
Processor API, users can implement `StateStore` interface using any internal 
storage engine they like and implement whatever change-logging mechanism they 
need. You can also expose any interface you like to query a store, by 
implementing a custom `QueryableStoreType`.

The coupling describe in this ticket, is a DSL limitation only.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-03-19 Thread Paul Whalen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796519#comment-16796519
 ] 

Paul Whalen commented on KAFKA-:


This is a very interesting idea that I am suddenly very excited about, and 
since my team has a somewhat related problem, I'll phrase it the way we've been 
thinking of it: we love that key value state stores can be backed up to topics, 
but in our streams application we want a much richer way of querying data than 
just by key.

In a sense, {{range()}} partly solves this problem because it allows for a 
different way of querying the store rather then just based on your exact key.  
But the real win would be a complete decoupling of local state store 
implementation and how it changelogs to kafka.  It wouldn't need to be just 
key-value with range like RocksDB, but has a fancier on-disk structure that 
could support efficient querying or indexing of the data many ways (I'm 
thinking SQLite).  It would definitely increase fail-over/restore time, but 
that would be an acceptable/necessary tradeoff - if you're going to layout the 
data in a totally different format for querying, obviously you have to pay the 
cost of that translation.

What I'm proposing (completely decoupling local state store implementation and 
how it changelogs to kafka) is more useful for Processor API users, but it 
could also provide an API useable at the DSL level to enable what this JIRA is 
asking for (merely decoupling serdes between local state store and changelog in 
kafka).

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-03-19 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796490#comment-16796490
 ] 

Patrik Kleindl commented on KAFKA-:
---

[~mjsax] In https://issues.apache.org/jira/browse/KAFKA-8037 I implemented the 
deserialization as part of the global state restore to catch corrupted records. 
This might be extended to serialize into the store in another format.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2019-01-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749214#comment-16749214
 ] 

Matthias J. Sax commented on KAFKA-:


This is an interesting request. Note, that in state restore, Kafka Streams 
currently only copies the key/value bytes from the changelog topic unmodified 
into the state store. This change would required to deserialize the data using 
the topic serde, and serialize them again using the state store serde on 
restore. Thus, fail-over time might increase.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Trivial
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)