[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336798#comment-16336798 ]
Richard Yu edited comment on KAFKA-6035 at 1/24/18 2:50 AM: ------------------------------------------------------------ [~guozhang] Currently, I am unsure as to which class represents the change log because in the java doc of the file {{KGroupedStream.java}} I found it wrote: {code:java} /** Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, * allows the result to have a different type than the input values. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. ... **/ KTable<K,VR> aggregrate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); {code} In the last line of the above excerpt, it appears KTable was referred to as a changelog stream. Could you please help clarrify this for me? Thanks. was (Author: yohan123): [~guozhang] Currently, I am unsure as to which class represents the change log because in the java doc of the file \{{KGroupedStream.java}} I found it wrote: {code:java} /** Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, * allows the result to have a different type than the input values. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. ... **/ KTable<K,VR> aggregrate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); {code} In the last line of the above excerpt, it appears KTable was referred to as a changelog stream. Could you please help clarrify this for me? Thanks. > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > ---------------------------------------------------------------------------------------- > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Guozhang Wang > Assignee: Richard Yu > Priority: Major > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)