[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-07-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161423#comment-17161423
 ] 

Guozhang Wang commented on KAFKA-10179:
---

I agree with Almog and Rohan’s arguments here. What I’m thinking is how we 
could define a principle for users to indicate that:

1) the bytes in the source topic are exactly the same as bytes in the state 
store (i.e. the serdes are symmetric).
2) there’s no side-effects that serde incurs; only 1) and 2) together means it 
is safe to skip serde during restoration.
3) and also, there’s no corrupted or ill-formatted data from source topics that 
should be skipped when loading into state stores. This is 
https://issues.apache.org/jira/browse/KAFKA-8037

During restoration time, compared with during normal processing time.

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-26 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146707#comment-17146707
 ] 

Almog Gavra commented on KAFKA-10179:
-

[~ableegoldman] I confirmed locally that nothing "breaks" if we use a 
deserializer that projects a subset of the fields in the record, as you 
suspected, but consider the following points:
 # Some of the most popular serdes are asymmetric (e.g. avro builds in the 
concept of reader/writer schema into their APIs)
 # It may be impossible to determine, for a given serde, whether it is symmetric
 # State after recovery should be identical to before recovery for predictable 
operations (especially in cloud environments)
 # Some of the most popular serdes have side effects (e.g. Confluent schema 
registry serdes will create subjects on your behalf)

In practice, the first three points in conjunction with what [~mjsax] said (the 
source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic), means that we can't safely turn on the 
source-topic-changelog optimization unless the user indicates either (a) they 
are using a symmetrical serde or (b) they are willing to waive 3 in order to 
speed up recovery ([~cadonna] if we consider 3 a matter of correctness, we 
can't sacrifice correctness for performance without the user's consent).

Even if the user indicates (a) or (b) above, I still don't think we can 
implement the fix described here because of the fourth point. It may be 
possible that the user is using a symmetric serde but their schema is not 
identical to the one that wrote to the kafka topic (e.g. ksql, for example, 
generates a new schema where all the fields are the same but the schema has a 
different name, I can also easily imagine a schema with _more_ fields that 
would write the same value as it read from an event with fewer fields).

I'm not sure I understand this comment: "The data would be deserialized and 
re-serialized using the same Serde (this is inefficiency we pay, as we also 
need to send the de-serialized data downstream for further processing)." Why 
can't we just always pass-through the data into the state store if the 
optimization is enabled?

 

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-23 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143238#comment-17143238
 ] 

Sophie Blee-Goldman commented on KAFKA-10179:
-

[~desai.p.rohan] I'm not sure I understand why it's a problem for the 
deserializer to modify the value slightly, by dropping fields to take your 
example. We would end up restoring the full bytes into the store, sure, but the 
plain bytes are never actually used right? We would always go through the 
deserializer when reading the value from the store and using it in an 
operation. So the "extra" fields would still get dropped.

Maybe if your values are bloated with a lot of useful information that you 
didn't want to store, this could blow up the disk usage. But I think there's a 
difference between a simple operation on data to extract only the relevant bits 
– eg dropping a field you don't care about – and fundamentally transforming the 
data to get it into a different form. The former seems reasonable to do during 
a deserialization, but the latter should be its own operation in the topology.

Of course, this just applies to modifying the values. If your deserializer 
modifies the key in any way, this would be a problem since lookups by key would 
fail after a restoration copies over the plain bytes. But I would argue that 
it's illegal to modify the key during de/serialization at all, not because of 
the restoration issue but because it can cause incorrect partitioning.

Anyways, I'm probably overlooking something obvious, but I'm struggling to see 
exactly where and how this breaks. That said I do agree we should clarify that 
`serialize(deserialize())` must be the identity for keys

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-19 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140328#comment-17140328
 ] 

Bruno Cadonna commented on KAFKA-10179:
---

[~desai.p.rohan] While I find the idea of optimizing the materialization in the 
deserializer intriguing, I think the performance penalty that we would pay by 
deserializing and serializing each record during restoration is not worthwhile. 
Additionally -- if optimization is turned on -- we would need to read the 
original data from the source topic instead of the projected data from the 
changelog topic during each restoration which would again hit performance. Of 
course, we would need experiments to better understand the implications. 

An alternative idea would be to allow to plugin a byte-based transformation 
that does not need to deserialize and serialize each record. However, that 
would not solve the issue of having to read the unprojected data during each 
restoration.
 
If you are concerned with the amount of data to materialize a solution could be 
to optimize on topology-level by introducing a {{map()}} that makes the 
projection followed by a {{toTable()}} to materialize the data. That data read 
from the input topic would be the unprojected data but the one materialized is 
the projected one and also during restoration we would just read the projected 
data. An additional advantage of this method is that you can leave the source 
table optimization turned on, because it would not apply to this case.

In summary, the source table optimization was not introduced for the case you 
describe. IMO, it is not even an optimization in that case. 

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140208#comment-17140208
 ] 

Matthias J. Sax commented on KAFKA-10179:
-

What you say is fair I guess. Given the current code, if you want to do any of 
those, you need to disable the optimization.

However, for the actual bug this ticket is about, the problem seems to be, that 
if the optimization is turned on, at some point in the code we pass the 
changelog topic name into the serde instead of the source topic name. And thus 
the schema cannot be found and the serde crashes. Thus, this ticket should 
focus on this bug.

Not sure if KAFKA-8037 covers all cases you describe. Maybe you want to follow 
up on this ticket (so we can extent its scope) or create a new ticket that 
describes the shortcomings of the current implementation.

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140166#comment-17140166
 ] 

Rohan Desai commented on KAFKA-10179:
-

Also, it's not really clear from the documentation that 
`serialize(deserialize())` is assumed to be the identity function  for 
`ktable(..)`.

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140158#comment-17140158
 ] 

Rohan Desai commented on KAFKA-10179:
-

Deserialization may itself be a transformation. For example, suppose I have 
source data with 10 fields, but only care about 3 of them for my stream 
processing app. It seems that it would be reasonable to provide a deserializer 
that just extracts those 3 fields. I suppose you could express this as a 
projection after creating the table, but that does preclude optimizations that 
use selective deserialization. And it may be much more expensive to do the 
materialization (since you're potentially materializing lots of data 
unnecessarily). I think there should be some way to achieve each of the 
following: 

 
 * optimized and the data in the store is exactly the same as the topic data . 
In this case (what's implemented today) the data can be restored by writing the 
source records into the store
 * optimized and the deserializer transforms the data somehow. In this case the 
data can be restored by deserializing/serializing each row from the source 
topic before writing it into the store. I don't think this is possible today.
 * not optimized (which would you have a transforming deserializer and faster 
recovery, at the cost of extra data in kafka). I don't think this is possible 
today without turning all optimizations off.

 

> This is a known issue and tracked via: 
>https://issues.apache.org/jira/browse/KAFKA-8037
 ack - thanks!

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075
 ] 

Matthias J. Sax commented on KAFKA-10179:
-

{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a know issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139926#comment-17139926
 ] 

Rohan Desai commented on KAFKA-10179:
-

I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

This has consequences in practice when used with a schema registry using the 
confluent serializers. If we use the same topic, `serialize` might register a 
different schema with the source subject, which we probably don't want.

I think the technically correct thing to do (though this is of course more 
expensive) would be (when the source table is optimized) to deserialize and 
serialize each record when restoring.

Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.

 

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)