[ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336798#comment-16336798
 ] 

Richard Yu edited comment on KAFKA-6035 at 1/24/18 2:50 AM:
------------------------------------------------------------

[~guozhang] Currently, I am unsure as to which class represents the change log 
because in the java doc of the file {{KGroupedStream.java}} I found it wrote:

 
{code:java}
/**
Aggregate the values of records in this stream by the grouped key.
     * Records with {@code null} key or value are ignored.
     * Aggregating is a generalization of {@link #reduce(Reducer) combining via 
reduce(...)} as it, for example,
     * allows the result to have a different type than the input values.
     * The result is written into a local {@link KeyValueStore} (which is 
basically an ever-updating materialized view)
     * that can be queried using the provided {@code queryableStoreName}.
     * Furthermore, updates to the store are sent downstream into a {@link 
KTable} changelog stream.
...
**/
KTable<K,VR> aggregrate(final Initializer<VR> initializer,
                        final Aggregator<? super K, ? super V, VR> aggregator,
                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> 
materialized); 
{code}
In the last line of the above excerpt, it appears KTable was referred to as a 
changelog stream. Could you please help clarrify this for me?

Thanks.

 


was (Author: yohan123):
[~guozhang] Currently, I am unsure as to which class represents the change log 
because in the java doc of the file \{{KGroupedStream.java}} I found it wrote:

 
{code:java}
/**
Aggregate the values of records in this stream by the grouped key.
     * Records with {@code null} key or value are ignored.
     * Aggregating is a generalization of {@link #reduce(Reducer) combining via 
reduce(...)} as it, for example,
     * allows the result to have a different type than the input values.
     * The result is written into a local {@link KeyValueStore} (which is 
basically an ever-updating materialized view)
     * that can be queried using the provided {@code queryableStoreName}.
     * Furthermore, updates to the store are sent downstream into a {@link 
KTable} changelog stream.
...
**/
KTable<K,VR> aggregrate(final Initializer<VR> initializer,
                        final Aggregator<? super K, ? super V, VR> aggregator,
                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> 
materialized); 
{code}
In the last line of the above excerpt, it appears KTable was referred to as a 
changelog stream. Could you please help clarrify this for me?

Thanks.

 

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6035
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6035
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Richard Yu
>            Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to