Re: KTable#through from window store to key value store

2016-12-16 Thread Mikael Högqvist
Hi Eno,

in this example it doesn't make much sense, but I could add a .mapValues
that transforms the values and the use .through to materialize a table with
the updated values.

It is possible to the access the new table as a KeyValueStore also, but it
is much less convenient and I also expected that if I have a window store
it would still be a window store after transformation/through.

Thanks,
Mikael

On Fri, Dec 16, 2016 at 6:20 PM Eno Thereska  wrote:

> Hi Mikael,
>
> Currently that is not possible. Could you elaborate why you'd need that
> since you can query from tableOne.
>
> Thanks
> Eno
> > On 16 Dec 2016, at 10:45, Mikael Högqvist  wrote:
> >
> > Hi,
> >
> > I have a small example topology that count words per minute (scala):
> >
> >words
> >  .map { (key, word) =>
> >new KeyValue(word, Long.box(1L))
> >  }
> >  .groupByKey(Serdes.String, Serdes.Long)
> >  .count(TimeWindows.of(5 * 60 * 1000L), tableOne)
> >  .through(new WindowedSerde, Serdes.Long, s"$appId-count-topic",
> > tableTwo)
> >
> > The first table, tableOne, is a WindowStore and can be accessed using
> fetch
> > on the key and time range. After using .through to forward the data to
> > another topic and table, tableTwo becomes a KeyValueStore. Is it possible
> > to keep tableTwo as a WindowStore also?
> >
> > Best,
> > Mikael
>
>


KTable#through from window store to key value store

2016-12-16 Thread Mikael Högqvist
Hi,

I have a small example topology that count words per minute (scala):

words
  .map { (key, word) =>
new KeyValue(word, Long.box(1L))
  }
  .groupByKey(Serdes.String, Serdes.Long)
  .count(TimeWindows.of(5 * 60 * 1000L), tableOne)
  .through(new WindowedSerde, Serdes.Long, s"$appId-count-topic",
tableTwo)

The first table, tableOne, is a WindowStore and can be accessed using fetch
on the key and time range. After using .through to forward the data to
another topic and table, tableTwo becomes a KeyValueStore. Is it possible
to keep tableTwo as a WindowStore also?

Best,
Mikael


Re: Tumbling windows with long retention

2016-12-14 Thread Mikael Högqvist
Hi Matthias,

kind of :)

I'm interested in the retention mechanisms and my use case is to keep old
windows around for a long time (up to a year or longer) and access them via
interactive queries. As I understand from the documentation, the retention
mechanism is used to avoid changelogs from "growing out of bounds". This is
a bit unclear to me, what are the storage costs from using a window store?
For example, if data is received at a rate of 1 message per second and
messages are aggregated to a single key using a tumbling window of 1 hour,
would the size of the compacted changelog (and window store) be 24 records
after 24 hours?

Are there other potential tradeoffs when using the window store with a long
retention? E.g., looking at the rocksdb implementation, there is something
called a segment which seems to correspond to a single rocksdb instance.
Does that have an effect on querying?

Best,
Mikael


On Wed, Dec 14, 2016 at 6:44 PM Matthias J. Sax 
wrote:

I am not sure if I can follow.

However, in Kafka Streams using window aggregation, the windowed KTable
uses a key-value store internally -- it's only called windowed store
because it encodes the key for the store as pair of
 and also applies a couple of other mechanism with
regard to retention time to delete old windows.

Does this answer your question?


-Matthias

On 12/14/16 6:46 AM, Mikael Högqvist wrote:
> Hi,
>
> I'm wondering about the tradeoffs when implementing a tumbling window with
> a long retention, e.g. 1 year. Is it better to use a normal key value
store
> and aggregate the time bucket using a group by instead of a window store?
>
> Best,
> Mikael
>


Tumbling windows with long retention

2016-12-14 Thread Mikael Högqvist
Hi,

I'm wondering about the tradeoffs when implementing a tumbling window with
a long retention, e.g. 1 year. Is it better to use a normal key value store
and aggregate the time bucket using a group by instead of a window store?

Best,
Mikael


Re: KafkaStreams KTable#through not creating changelog topic

2016-11-25 Thread Mikael Högqvist
Thanks, based on this we will re-evaluate the use of internal topics. The
main motivation for using the internal changelog topics was to avoid
duplication of data and have an easy way to access the update stream of any
state store.

Best,
Mikael

On Fri, Nov 25, 2016 at 9:52 AM Michael Noll  wrote:

> Mikael,
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
>
> Yes, that (i.e. you are running with auto-topic creation enabled) was what
> I implicitly understood.  As covered in [1] we strongly recommend to
> manually pre-create/manage user topics though.  User topics include the
> source topics that you are reading from (cf. `stream()`, `table()`) but
> also include the topics you use in `through()` and `to()`.
>
>
> > Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
>
> There is already work being done in the Admin API (KIP-4), and part of this
> functionality was released in the latest Kafka versions.  You can use this
> to programmatically create topics, for example.  Note though that the work
> on KIP-4 is not fully completed yet.
>
> -Michael
>
>
>
>
> [1]
>
> http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application
>
>
>
> On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist 
> wrote:
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
> >
> > That said, as a user, I think it would be great with a function in the
> > Kafka Streams DSL that would allow me to materialize a KTable without
> > pre-creating the topic. Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
> >
> > Best,
> > Mikael
> >
> > On Thu, Nov 24, 2016 at 1:44 PM Damian Guy  wrote:
> >
> > Mikeal,
> >
> > When you use `through(..)` topics are not created by KafkaStreams. You
> need
> > to create them yourself before you run the application.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist 
> wrote:
> >
> > > Yes, the naming is not an issue.
> > >
> > > I've tested this with the topology described earlier. Every time I
> start
> > > the topology with a call to .through() that references a topic that
> does
> > > not exist, I get an exception from the UncaughtExceptionHandler:
> > >
> > > Uncaught exception org.apache.kafka.streams.errors.StreamsException:
> > Topic
> > > not found during partition assignment: words-count-changelog
> > >
> > > This happens when .through("words-count-changelog", "count") is part of
> > the
> > > topology. The topology is also not forwarding anything to that
> > topic/store.
> > > After restarting the application it works fine.
> > >
> > > Are the changelog topics created via, for example, .aggregate()
> different
> > > to topics auto created via .through()?
> > >
> > > Thanks,
> > > Mikael
> > >
> > > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax  >
> > > wrote:
> > >
> > > > > 1) Create a state store AND the changelog
> > > > > topic 2) follow the Kafka Streams naming convention for changelog
> > > topics.
> > > > > Basically, I want to have a method that does what .through() is
> > > supposed
> > > > to
> > > > > do according to the documentation, but without the "topic"
> parameter.
> > > >
> > > > I understand what you are saying, but you can get this done right
> now,
> > > > too. If you use through(...) you will get the store. And you can just
> > > > specify the topic name as "applicationId-storeName-changelog" to
> > follow
> > > > the naming convention Streams used internally. What is the problem
> > using
> > > > this approach (besides that you have to provide the topic name

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-24 Thread Mikael Högqvist
Sure, I guess the topic is auto-created the first time I start the topology
and the second time its there already. It could be possible to create
topics up front for us, or even use an admin call from inside the code.

That said, as a user, I think it would be great with a function in the
Kafka Streams DSL that would allow me to materialize a KTable without
pre-creating the topic. Today there is .through(topic, store) and
.to(topic), maybe it would be possible to have something like
.materialize(store) which takes care of topic creation? Would adding
something like this require a KIP?

Best,
Mikael

On Thu, Nov 24, 2016 at 1:44 PM Damian Guy  wrote:

Mikeal,

When you use `through(..)` topics are not created by KafkaStreams. You need
to create them yourself before you run the application.

Thanks,
Damian

On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist  wrote:

> Yes, the naming is not an issue.
>
> I've tested this with the topology described earlier. Every time I start
> the topology with a call to .through() that references a topic that does
> not exist, I get an exception from the UncaughtExceptionHandler:
>
> Uncaught exception org.apache.kafka.streams.errors.StreamsException: Topic
> not found during partition assignment: words-count-changelog
>
> This happens when .through("words-count-changelog", "count") is part of
the
> topology. The topology is also not forwarding anything to that
topic/store.
> After restarting the application it works fine.
>
> Are the changelog topics created via, for example, .aggregate() different
> to topics auto created via .through()?
>
> Thanks,
> Mikael
>
> On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax 
> wrote:
>
> > > 1) Create a state store AND the changelog
> > > topic 2) follow the Kafka Streams naming convention for changelog
> topics.
> > > Basically, I want to have a method that does what .through() is
> supposed
> > to
> > > do according to the documentation, but without the "topic" parameter.
> >
> > I understand what you are saying, but you can get this done right now,
> > too. If you use through(...) you will get the store. And you can just
> > specify the topic name as "applicationId-storeName-changelog" to follow
> > the naming convention Streams used internally. What is the problem using
> > this approach (besides that you have to provide the topic name which
> > seems not to be a big burden to me?)
> >
> >
> > -Matthias
> >
> >
> > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > Hi Michael,
> > >
> > > thanks for the extensive explanation, and yes it definitely helps with
> my
> > > understanding of through(). :)
> > >
> > > You guessed correctly that I'm doing some "shenanings" where I'm
trying
> > to
> > > derive the changelog of a state store from the state store name. This
> > works
> > > perfectly fine with with a naming convention for the topics and by
> > creating
> > > them in Kafka upfront.
> > >
> > > My point is that it would help me (and maybe others), if the API of
> > KTable
> > > was extended to have a new method that does two things that is not
part
> > of
> > > the implementation of .through(). 1) Create a state store AND the
> > changelog
> > > topic 2) follow the Kafka Streams naming convention for changelog
> topics.
> > > Basically, I want to have a method that does what .through() is
> supposed
> > to
> > > do according to the documentation, but without the "topic" parameter.
> > >
> > > What do you think, would it be possible to extend the API with a
method
> > > like that?
> > >
> > > Thanks,
> > > Mikael
> > >
> > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll 
> > wrote:
> > >
> > >> Mikael,
> > >>
> > >> regarding your second question:
> > >>
> > >>> 2) Regarding the use case, the topology looks like this:
> > >>>
> > >>> .stream(...)
> > >>> .aggregate(..., "store-1")
> > >>> .mapValues(...)
> > >>> .through(..., "store-2")
> > >>
> > >> The last operator above would, without "..." ellipsis, be sth like
> > >> `KTable#through("through-topic", "store-2")`.  Here, "through-topic"
> is
> > the
> > >> changelog topic for both the KTable and the state store "store-2".
So
> > this
> > >> is the changelog topic n

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-24 Thread Mikael Högqvist
Yes, the naming is not an issue.

I've tested this with the topology described earlier. Every time I start
the topology with a call to .through() that references a topic that does
not exist, I get an exception from the UncaughtExceptionHandler:

Uncaught exception org.apache.kafka.streams.errors.StreamsException: Topic
not found during partition assignment: words-count-changelog

This happens when .through("words-count-changelog", "count") is part of the
topology. The topology is also not forwarding anything to that topic/store.
After restarting the application it works fine.

Are the changelog topics created via, for example, .aggregate() different
to topics auto created via .through()?

Thanks,
Mikael

On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax 
wrote:

> > 1) Create a state store AND the changelog
> > topic 2) follow the Kafka Streams naming convention for changelog topics.
> > Basically, I want to have a method that does what .through() is supposed
> to
> > do according to the documentation, but without the "topic" parameter.
>
> I understand what you are saying, but you can get this done right now,
> too. If you use through(...) you will get the store. And you can just
> specify the topic name as "applicationId-storeName-changelog" to follow
> the naming convention Streams used internally. What is the problem using
> this approach (besides that you have to provide the topic name which
> seems not to be a big burden to me?)
>
>
> -Matthias
>
>
> On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > Hi Michael,
> >
> > thanks for the extensive explanation, and yes it definitely helps with my
> > understanding of through(). :)
> >
> > You guessed correctly that I'm doing some "shenanings" where I'm trying
> to
> > derive the changelog of a state store from the state store name. This
> works
> > perfectly fine with with a naming convention for the topics and by
> creating
> > them in Kafka upfront.
> >
> > My point is that it would help me (and maybe others), if the API of
> KTable
> > was extended to have a new method that does two things that is not part
> of
> > the implementation of .through(). 1) Create a state store AND the
> changelog
> > topic 2) follow the Kafka Streams naming convention for changelog topics.
> > Basically, I want to have a method that does what .through() is supposed
> to
> > do according to the documentation, but without the "topic" parameter.
> >
> > What do you think, would it be possible to extend the API with a method
> > like that?
> >
> > Thanks,
> > Mikael
> >
> > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll 
> wrote:
> >
> >> Mikael,
> >>
> >> regarding your second question:
> >>
> >>> 2) Regarding the use case, the topology looks like this:
> >>>
> >>> .stream(...)
> >>> .aggregate(..., "store-1")
> >>> .mapValues(...)
> >>> .through(..., "store-2")
> >>
> >> The last operator above would, without "..." ellipsis, be sth like
> >> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the
> >> changelog topic for both the KTable and the state store "store-2".  So
> this
> >> is the changelog topic name that you want to know.
> >>
> >> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd
> >> need to add that yourself in the call to `through(...)`.
> >>
> >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> >> automatically:  That's because `through()` -- like `to()` or `stream()`,
> >> `table()` -- require you to explicitly provide a topic name, and of
> course
> >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> only
> >> added when Kafka creates internal changelog topics behind the scenes for
> >> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> >> because it refers to `-changelog`;  we'll fix that as mentioned above.
> >>
> >> - Also, in case you want to do some shenanigans (like for some tooling
> >> you're building around state stores/changelogs/interactive queries) such
> >> detecting all state store changelogs by doing the equivalent of `ls
> >> *-changelog`, then this will miss changelogs of KTables that are
> created by
> >> `through()` and `to()` (unless you come up with a naming convention that
> >> your tooling 

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Mikael Högqvist
Hi Michael,

thanks for the extensive explanation, and yes it definitely helps with my
understanding of through(). :)

You guessed correctly that I'm doing some "shenanings" where I'm trying to
derive the changelog of a state store from the state store name. This works
perfectly fine with with a naming convention for the topics and by creating
them in Kafka upfront.

My point is that it would help me (and maybe others), if the API of KTable
was extended to have a new method that does two things that is not part of
the implementation of .through(). 1) Create a state store AND the changelog
topic 2) follow the Kafka Streams naming convention for changelog topics.
Basically, I want to have a method that does what .through() is supposed to
do according to the documentation, but without the "topic" parameter.

What do you think, would it be possible to extend the API with a method
like that?

Thanks,
Mikael

On Wed, Nov 23, 2016 at 4:16 PM Michael Noll  wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
> changelog topic for both the KTable and the state store "store-2".  So this
> is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then you'd
> need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
> wrote:
>
> > Hi Eno,
> >
> > 1) Great :)
> >
> > 2) Yes, we are using the Interactive Queries to access the state stores.
> In
> > addition, we access the changelogs to subscribe to updates. For this
> reason
> > we need to know the changelog topic name.
> >
> > Thanks,
> > Mikael
> >
> > On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
> > wrote:
> >
> > > HI Mikael,
> > >
> > > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> looking
> > > into fixing it. I agree that it can be confusing to have topic names
> that
> > > are not what one would expect.
> > >
> > > 2) If your goal is to query/read from the state stores, you can use
> > > Interactive Queries to do that (you don't need to worry about the
> > changelog
> > > topic name and such). Interactive Queries is a new feature in 0.10.1
> > (blog
> > > here:
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > <
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > >).
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 22 Nov 2016, at 19:27, Mikael Högqvist 
> wrote:
> > > >
> > > > Sorry for being unclear, i'll try again :)
> > > >
> > > > 1) The JavaDoc for through is not correct, it states that a changelog
> > > topic
> > > > will be created for the state store. That is, if I would call it with
> > > > through("topic", "a-store"), I would expect a kafka topic
> > > > "my-app-id-a-store-changelog" to be created.
> > > >
> >

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-22 Thread Mikael Högqvist
Hi Eno,

1) Great :)

2) Yes, we are using the Interactive Queries to access the state stores. In
addition, we access the changelogs to subscribe to updates. For this reason
we need to know the changelog topic name.

Thanks,
Mikael

On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska  wrote:

> HI Mikael,
>
> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> into fixing it. I agree that it can be confusing to have topic names that
> are not what one would expect.
>
> 2) If your goal is to query/read from the state stores, you can use
> Interactive Queries to do that (you don't need to worry about the changelog
> topic name and such). Interactive Queries is a new feature in 0.10.1 (blog
> here:
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> <
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> >).
>
> Thanks
> Eno
>
>
> > On 22 Nov 2016, at 19:27, Mikael Högqvist  wrote:
> >
> > Sorry for being unclear, i'll try again :)
> >
> > 1) The JavaDoc for through is not correct, it states that a changelog
> topic
> > will be created for the state store. That is, if I would call it with
> > through("topic", "a-store"), I would expect a kafka topic
> > "my-app-id-a-store-changelog" to be created.
> >
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
> >
> > Basically, I want to materialize both the result from the aggregate
> method
> > and the result from mapValues, which is materialized using .through().
> > Later, I will access both the tables (store-1 and store-2) to a) get the
> > current state of the aggregate, b) subscribe to future updates. This
> works
> > just fine. The only issue is that I assumed to have a changelog topic for
> > store-2 created automatically, which didnt happen.
> >
> > Since I want to access the changelog topic, it helps if the naming is
> > consistent. So either we enforce the same naming pattern as kafka when
> > calling .through() or alternatively the Kafka Streams API can provide a
> > method to materialize tables which creates a topic name according to the
> > naming pattern. E.g. .through() without the topic parameter.
> >
> > What do you think?
> >
> > Best,
> > Mikael
> >
> > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax 
> > wrote:
> >
> >> I cannot completely follow what want to achieve.
> >>
> >> However, the JavaDoc for through() seems not to be correct to me. Using
> >> through() will not create an extra internal changelog topic with the
> >> described naming schema, because the topic specified in through() can be
> >> used for this (there is no point in duplicating the data).
> >>
> >> If you have a KTable and apply a mapValues(), this will not write data
> >> to any topic. The derived KTable is in-memory because you can easily
> >> recreate it from its base KTable.
> >>
> >> What is the missing part you want to get?
> >>
> >> Btw: the internally created changelog topics are only used for recovery
> >> in case of failure. Streams does not consumer from those topic during
> >> "normal operation".
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> >>> Hi,
> >>>
> >>> in the documentation for KTable#through, it is stated that a new
> >> changelog
> >>> topic will be created for the table. It also states that calling
> through
> >> is
> >>> equivalent to calling #to followed by KStreamBuilder#table.
> >>>
> >>>
> >>
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
> >>>
> >>> In the docs for KStreamBuilder#table it is stated that no new changelog
> >>> topic will be created since the underlying topic acts as the changelog.
> >>> I've verified that this is the case.
> >>>
> >>> Is there another API method to materialize the results of a KTable
> >>> including a changelog, i.e. such that kafka streams creates the topic
> and
> >>> uses the naming schema for changelog topics? The use case I have in
> mind
> >> is
> >>> aggregate followed by mapValues.
> >>>
> >>> Best,
> >>> Mikael
> >>>
> >>
> >>
>
>


Re: KafkaStreams KTable#through not creating changelog topic

2016-11-22 Thread Mikael Högqvist
Sorry for being unclear, i'll try again :)

1) The JavaDoc for through is not correct, it states that a changelog topic
will be created for the state store. That is, if I would call it with
through("topic", "a-store"), I would expect a kafka topic
"my-app-id-a-store-changelog" to be created.

2) Regarding the use case, the topology looks like this:

.stream(...)
.aggregate(..., "store-1")
.mapValues(...)
.through(..., "store-2")

Basically, I want to materialize both the result from the aggregate method
and the result from mapValues, which is materialized using .through().
Later, I will access both the tables (store-1 and store-2) to a) get the
current state of the aggregate, b) subscribe to future updates. This works
just fine. The only issue is that I assumed to have a changelog topic for
store-2 created automatically, which didnt happen.

Since I want to access the changelog topic, it helps if the naming is
consistent. So either we enforce the same naming pattern as kafka when
calling .through() or alternatively the Kafka Streams API can provide a
method to materialize tables which creates a topic name according to the
naming pattern. E.g. .through() without the topic parameter.

What do you think?

Best,
Mikael

On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax 
wrote:

> I cannot completely follow what want to achieve.
>
> However, the JavaDoc for through() seems not to be correct to me. Using
> through() will not create an extra internal changelog topic with the
> described naming schema, because the topic specified in through() can be
> used for this (there is no point in duplicating the data).
>
> If you have a KTable and apply a mapValues(), this will not write data
> to any topic. The derived KTable is in-memory because you can easily
> recreate it from its base KTable.
>
> What is the missing part you want to get?
>
> Btw: the internally created changelog topics are only used for recovery
> in case of failure. Streams does not consumer from those topic during
> "normal operation".
>
>
> -Matthias
>
>
>
> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > Hi,
> >
> > in the documentation for KTable#through, it is stated that a new
> changelog
> > topic will be created for the table. It also states that calling through
> is
> > equivalent to calling #to followed by KStreamBuilder#table.
> >
> >
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
> >
> > In the docs for KStreamBuilder#table it is stated that no new changelog
> > topic will be created since the underlying topic acts as the changelog.
> > I've verified that this is the case.
> >
> > Is there another API method to materialize the results of a KTable
> > including a changelog, i.e. such that kafka streams creates the topic and
> > uses the naming schema for changelog topics? The use case I have in mind
> is
> > aggregate followed by mapValues.
> >
> > Best,
> > Mikael
> >
>
>


KafkaStreams KTable#through not creating changelog topic

2016-11-22 Thread Mikael Högqvist
Hi,

in the documentation for KTable#through, it is stated that a new changelog
topic will be created for the table. It also states that calling through is
equivalent to calling #to followed by KStreamBuilder#table.

http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)

In the docs for KStreamBuilder#table it is stated that no new changelog
topic will be created since the underlying topic acts as the changelog.
I've verified that this is the case.

Is there another API method to materialize the results of a KTable
including a changelog, i.e. such that kafka streams creates the topic and
uses the naming schema for changelog topics? The use case I have in mind is
aggregate followed by mapValues.

Best,
Mikael