HI Mikael,

1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking into 
fixing it. I agree that it can be confusing to have topic names that are not 
what one would expect.

2) If your goal is to query/read from the state stores, you can use Interactive 
Queries to do that (you don't need to worry about the changelog topic name and 
such). Interactive Queries is a new feature in 0.10.1 (blog here: 
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
 
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>).
 

Thanks
Eno


> On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqv...@gmail.com> wrote:
> 
> Sorry for being unclear, i'll try again :)
> 
> 1) The JavaDoc for through is not correct, it states that a changelog topic
> will be created for the state store. That is, if I would call it with
> through("topic", "a-store"), I would expect a kafka topic
> "my-app-id-a-store-changelog" to be created.
> 
> 2) Regarding the use case, the topology looks like this:
> 
> .stream(...)
> .aggregate(..., "store-1")
> .mapValues(...)
> .through(..., "store-2")
> 
> Basically, I want to materialize both the result from the aggregate method
> and the result from mapValues, which is materialized using .through().
> Later, I will access both the tables (store-1 and store-2) to a) get the
> current state of the aggregate, b) subscribe to future updates. This works
> just fine. The only issue is that I assumed to have a changelog topic for
> store-2 created automatically, which didnt happen.
> 
> Since I want to access the changelog topic, it helps if the naming is
> consistent. So either we enforce the same naming pattern as kafka when
> calling .through() or alternatively the Kafka Streams API can provide a
> method to materialize tables which creates a topic name according to the
> naming pattern. E.g. .through() without the topic parameter.
> 
> What do you think?
> 
> Best,
> Mikael
> 
> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I cannot completely follow what want to achieve.
>> 
>> However, the JavaDoc for through() seems not to be correct to me. Using
>> through() will not create an extra internal changelog topic with the
>> described naming schema, because the topic specified in through() can be
>> used for this (there is no point in duplicating the data).
>> 
>> If you have a KTable and apply a mapValues(), this will not write data
>> to any topic. The derived KTable is in-memory because you can easily
>> recreate it from its base KTable.
>> 
>> What is the missing part you want to get?
>> 
>> Btw: the internally created changelog topics are only used for recovery
>> in case of failure. Streams does not consumer from those topic during
>> "normal operation".
>> 
>> 
>> -Matthias
>> 
>> 
>> 
>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
>>> Hi,
>>> 
>>> in the documentation for KTable#through, it is stated that a new
>> changelog
>>> topic will be created for the table. It also states that calling through
>> is
>>> equivalent to calling #to followed by KStreamBuilder#table.
>>> 
>>> 
>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
>>> 
>>> In the docs for KStreamBuilder#table it is stated that no new changelog
>>> topic will be created since the underlying topic acts as the changelog.
>>> I've verified that this is the case.
>>> 
>>> Is there another API method to materialize the results of a KTable
>>> including a changelog, i.e. such that kafka streams creates the topic and
>>> uses the naming schema for changelog topics? The use case I have in mind
>> is
>>> aggregate followed by mapValues.
>>> 
>>> Best,
>>> Mikael
>>> 
>> 
>> 

Reply via email to