Guozhang Wang created KAFKA-5581:
------------------------------------
Summary: Streams can be smarter in deciding when to create
changelog topics for state stores
Key: KAFKA-5581
URL: https://issues.apache.org/jira/browse/KAFKA-5581
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Guozhang Wang
Today Streams make all state stores to be backed by a changelog topic by
default unless users overrides it by {{disableLogging}} when creating the state
store / materializing the KTable. However there are a few cases where a
separate changelog topic would not be required as we can re-use an existing
topic for that. A few examples:
There are a few places where the materialized store do not need a separate
changelog topic, for example:
1) If a KTable is read directly from a source topic, and is materialized i.e.
{code}
table1 = builder.table("topic1", "store1")`.
{code}
In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do not
need to create a separate {{table1-changelog}} topic.
2) if a KTable is materialized and then sent directly into a sink topic with
the same key, e.g.
{code}
table1 = stream.groupBy(...).aggregate("state1").to("topic2");
{code}
In this case {{state1}}'s changelog topic can just be {{topic2}}, and we do not
need to create a separate {{state1-changelog}} topic anymore;
3) if a KStream is materialized for joins where the streams are directly from a
topic, e.g.:
{code}
stream1 = builder.stream("topic1");
stream2 = builder.stream("topic2");
stream3 = stream1.join(stream2, windows); // stream1 and stream2 are
materialized with a changelog topic
{code}
Since stream materialization is append-only we do not need a changelog for the
state store as well but can just use the source {{topic1}} and {{topic2}}.
4) When you have some simple transformation operations or even join operations
that generated new KTables, and which needs to be materialized with a state
store, you can use the changelog topic of the previous KTable and applies the
transformation logic upon restoration instead of creating a new changelog
topic. For example:
{code}
table1 = builder.table("topic1");
table2 = table1.filter(..).join(table3); // table2 needs to be materialized for
joining
{code}
We can set the {{getter}} function of table2's materialized store, say
{{state2}} to be reading from {{topic1}} and then apply the filter operator,
instead of creating a new {{state2-changelog}} topic in this case.
5) more use cases ...
We can come up with a general internal impl optimizations to determine when /
how to set the changelog topic for those materialized stores at the runtime
startup when generating the topology.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)