Hi there,

The inconsistency will be resolved, whether with materialize or overloaded 
methods. 

With the discussion on the DSL & stores I feel we've gone in a slightly 
different tangent, which is worth discussing nonetheless. We have entered into 
an argument around the scope of the DSL. The DSL has been designed primarily 
for processing. The DSL does not dictate ways to access state stores or what 
hind of queries to perform on them. Hence, I see the mechanism for accessing 
storage as decoupled from the DSL. 

We could think of ways to get store handles from part of the DSL, like the 
KTable abstraction. However, subsequent queries will be store-dependent and not 
rely on the DSL, hence I'm not sure we get any grand-convergence DSL-Store 
here. So I am arguing that the current way of getting a handle on state stores 
is fine.

Thanks
Eno

> On 30 Jan 2017, at 03:56, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> Thinking loud here about the API options (materialize v.s. overloaded
> functions) and its impact on IQ:
> 
> 1. The first issue of the current DSL is that, there is inconsistency upon
> whether / how KTables should be materialized:
> 
>    a) in many cases the library HAS TO materialize KTables no matter what,
> e.g. KStream / KTable aggregation resulted KTables, and hence we enforce
> users to provide store names and throw RTE if it is null;
>    b) in some other cases, the KTable can be materialized or not; for
> example in KStreamBuilder.table(), store names can be nullable and in which
> case the KTable would not be materialized;
>    c) in some other cases, the KTable will never be materialized, for
> example KTable.filter() resulted KTables, and users have no options to
> enforce them to be materialized;
>    d) this is related to a), where some KTables are required to be
> materialized, but we do not enforce users to provide a state store name,
> e.g. KTables involved in joins; a RTE will be thrown not immediately but
> later in this case.
> 
> 2. The second issue is related to IQ, where state stores are accessed by
> their state stores; so only those KTable's that have user-specified state
> stores will be queryable. But because of 1) above, many stores may not be
> interested to users for IQ but they still need to provide a (dummy?) state
> store name for them; while on the other hand users cannot query some state
> stores, e.g. the ones generated by KTable.filter() as there is no APIs for
> them to specify a state store name.
> 
> 3. We are aware from user feedbacks that such backend details would be
> better be abstracted away from the DSL layer, where app developers should
> just focus on processing logic, while state stores along with their
> changelogs etc would better be in a different mechanism; same arguments
> have been discussed for serdes / windowing triggers as well. For serdes
> specifically, we had a very long discussion about it and concluded that, at
> least in Java7, we cannot completely abstract serde away in the DSL, so we
> choose the other extreme to enforce users to be completely aware of the
> serde requirements when some KTables may need to be materialized vis
> overloaded API functions. While for the state store names, I feel it is a
> different argument than serdes (details below).
> 
> 
> So to me, for either materialize() v.s. overloaded functions directions,
> the first thing I'd like to resolve is the inconsistency issue mentioned
> above. So in either case: KTable materialization will not be affect by user
> providing state store name or not, but will only be decided by the library
> when it is necessary. More specifically, only join operator and
> builder.table() resulted KTables are not always materialized, but are still
> likely to be materialized lazily (e.g. when participated in a join
> operator).
> 
> 
> For overloaded functions that would mean:
> 
>    a) we have an overloaded function for ALL operators that could result
> in a KTable, and allow it to be null (i.e. for the function without this
> param it is null by default);
>    b) null-state-store-name do not indicate that a KTable would not be
> materialized, but that it will not be used for IQ at all (internal state
> store names will be generated when necessary).
> 
> 
> For materialize() that would mean:
> 
>    a) we will remove state store names from ALL operators that could
> result in a KTable.
>    b) KTables that not calling materialized do not indicate that a KTable
> would not be materialized, but that it will not be used for IQ at all
> (internal state store names will be generated when necessary).
> 
> 
> Again, in either ways the API itself does not "hint" about anything for
> materializing a KTable or not at all; it is still purely determined by the
> library when parsing the DSL for now.
> 
> Following these thoughts, I feel that 1) we should probably change the name
> "materialize" since it may be misleading to users as what actually happened
> behind the scene, to e.g. Damian suggested "queryableStore(String storeName)",
> which returns a QueryableStateStore, and can replace the
> `KafkaStreams.store` function; 2) comparing those two options assuming we
> get rid of the misleading function name, I personally favor not adding more
> overloading functions as it keeps the API simpler.
> 
> 
> 
> Guozhang
> 
> 
> On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
> 
>> Hi,
>> 
>> thanks for your mail, felt like this can clarify some things! The thread
>> unfortunately split but as all branches close in on what my suggestion was
>> about Ill pick this to continue
>> 
>> Of course only the table the user wants to query would be materialized.
>> (retrieving the queryhandle implies materialisation). So In the example of
>> KTable::filter if you call
>> getIQHandle on both tables only the one source that is there would
>> materialize and the QueryHandleabstraction would make sure it gets mapped
>> and filtered and what not uppon read as usual.
>> 
>> Of Course the Object you would retrieve would maybe only wrap the
>> storeName / table unique identifier and a way to access the streams
>> instance and then basically uses the same mechanism that is currently used.
>> From my point of view this is the least confusing way for DSL users. If
>> its to tricky to get a hand on the streams instance one could ask the user
>> to pass it in before executing queries, therefore making sure the streams
>> instance has been build.
>> 
>> The effort to implement this is indeed some orders of magnitude higher
>> than the overloaded materialized call. As long as I could help getting a
>> different view I am happy.
>> 
>> Best Jan
>> 
>> 
>> On 28.01.2017 09:36, Eno Thereska wrote:
>> 
>>> Hi Jan,
>>> 
>>> I understand your concern. One implication of not passing any store name
>>> and just getting an IQ handle is that all KTables would need to be
>>> materialised. Currently the store name (or proposed .materialize() call)
>>> act as hints on whether to materialise the KTable or not. Materialising
>>> every KTable can be expensive, although there are some tricks one can play,
>>> e.g., have a virtual store rather than one backed by a Kafka topic.
>>> 
>>> However, even with the above, after getting an IQ handle, the user would
>>> still need to use IQ APIs to query the state. As such, we would still
>>> continue to be outside the original DSL so this wouldn't address your
>>> original concern.
>>> 
>>> So I read this suggestion as simplifying the APIs by removing the store
>>> name, at the cost of having to materialise every KTable. It's definitely an
>>> option we'll consider as part of this KIP.
>>> 
>>> Thanks
>>> Eno
>>> 
>>> 
>>> On 28 Jan 2017, at 06:49, Jan Filipiak <jan.filip...@trivago.com> wrote:
>>>> 
>>>> Hi Exactly
>>>> 
>>>> I know it works from the Processor API, but my suggestion would prevent
>>>> DSL users dealing with storenames what so ever.
>>>> 
>>>> In general I am pro switching between DSL and Processor API easily. (In
>>>> my Stream applications I do this a lot with reflection and instanciating
>>>> KTableImpl) Concerning this KIP all I say is that there should be a DSL
>>>> concept of "I want to expose this __KTable__. This can be a Method like
>>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the table would know
>>>> to materialize, and the user had a reference to the "store and the
>>>> distributed query mechanism by the Interactive Query Handle" under the hood
>>>> it can use the same mechanism as the PIP people again.
>>>> 
>>>> I hope you see my point J
>>>> 
>>>> Best Jan
>>>> 
>>>> 
>>>> #DeathToIQMoreAndBetterConnectors :)
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On 27.01.2017 21:59, Matthias J. Sax wrote:
>>>> 
>>>>> Jan,
>>>>> 
>>>>> the IQ feature is not limited to Streams DSL but can also be used for
>>>>> Stores used in PAPI. Thus, we need a mechanism that does work for PAPI
>>>>> and DSL.
>>>>> 
>>>>> Nevertheless I see your point and I think we could provide a better API
>>>>> for KTable stores including the discovery of remote shards of the same
>>>>> KTable.
>>>>> 
>>>>> @Michael: Yes, right now we do have a lot of overloads and I am not a
>>>>> big fan of those -- I would rather prefer a builder pattern. But that
>>>>> might be a different discussion (nevertheless, if we would aim for a API
>>>>> rework, we should get the changes with regard to stores right from the
>>>>> beginning on, in order to avoid a redesign later on.)
>>>>> 
>>>>> something like:
>>>>> 
>>>>> stream.groupyByKey()
>>>>>       .window(TimeWindow.of(5000))
>>>>>       .aggregate(...)
>>>>>       .withAggValueSerde(new CustomTypeSerde())
>>>>>       .withStoreName("storeName);
>>>>> 
>>>>> 
>>>>> (This would also reduce JavaDoc redundancy -- maybe a personal pain
>>>>> point right now :))
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote:
>>>>> 
>>>>>> Yeah,
>>>>>> 
>>>>>> Maybe my bad that I refuse to look into IQ as i don't find them
>>>>>> anywhere
>>>>>> close to being interesting. The Problem IMO is that people need to know
>>>>>> the Store name), so we are working on different levels to achieve a
>>>>>> single goal.
>>>>>> 
>>>>>> What is your peoples opinion on having a method on KTABLE that returns
>>>>>> them something like a Keyvalue store. There is of course problems like
>>>>>> "it cant be used before the streamthreads are going and groupmembership
>>>>>> is established..." but the benefit would be that for the user there is
>>>>>> a
>>>>>> consistent way of saying "Hey I need it materialized as querries gonna
>>>>>> be comming" + already get a Thing that he can execute the querries on
>>>>>> in
>>>>>> 1 step.
>>>>>> What I think is unintuitive here is you need to say materialize on this
>>>>>> Ktable and then you go somewhere else and find its store name and then
>>>>>> you go to the kafkastreams instance and ask for the store with this
>>>>>> name.
>>>>>> 
>>>>>> So one could the user help to stay in DSL land and therefore maybe
>>>>>> confuse him less.
>>>>>> 
>>>>>> Best Jan
>>>>>> 
>>>>>> #DeathToIQMoreAndBetterConnectors :)
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 27.01.2017 16:51, Damian Guy wrote:
>>>>>> 
>>>>>>> I think Jan is saying that they don't always need to be materialized,
>>>>>>> i.e.,
>>>>>>> filter just needs to apply the ValueGetter, it doesn't need yet
>>>>>>> another
>>>>>>> physical state store.
>>>>>>> 
>>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll <mich...@confluent.io>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Like Damian, and for the same reasons, I am more in favor of
>>>>>>>> overloading
>>>>>>>> methods rather than introducing `materialize()`.
>>>>>>>> FWIW, we already have a similar API setup for e.g.
>>>>>>>> `KTable#through(topicName, stateStoreName)`.
>>>>>>>> 
>>>>>>>> A related but slightly different question is what e.g. Jan Filipiak
>>>>>>>> mentioned earlier in this thread:
>>>>>>>> I think we need to explain more clearly why KIP-114 doesn't propose
>>>>>>>> the
>>>>>>>> seemingly simpler solution of always materializing tables/state
>>>>>>>> stores.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <
>>>>>>>> jan.filip...@trivago.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? If you uses
>>>>>>>>> the
>>>>>>>>> ValueGetter of Filter it will apply the filter and should be
>>>>>>>>> completely
>>>>>>>>> transparent as to if another processor or IQ is accessing it? How
>>>>>>>>> can
>>>>>>>>> 
>>>>>>>> this
>>>>>>>> 
>>>>>>>>> new method help?
>>>>>>>>> 
>>>>>>>>> I cannot see the reason for the additional materialize method being
>>>>>>>>> required! Hence I suggest leave it alone.
>>>>>>>>> regarding removing the others I dont have strong opinions and it
>>>>>>>>> seems to
>>>>>>>>> be unrelated.
>>>>>>>>> 
>>>>>>>>> Best Jan
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote:
>>>>>>>>> 
>>>>>>>>> Forwarding this thread to the users list too in case people would
>>>>>>>>>> like
>>>>>>>>>> 
>>>>>>>>> to
>>>>>>>> 
>>>>>>>>> comment. It is also on the dev list.
>>>>>>>>>> 
>>>>>>>>>> Thanks
>>>>>>>>>> Eno
>>>>>>>>>> 
>>>>>>>>>> Begin forwarded message:
>>>>>>>>>> 
>>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io>
>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and
>>>>>>>>>>> improved
>>>>>>>>>>> semantics
>>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT
>>>>>>>>>>> To: d...@kafka.apache.org
>>>>>>>>>>> Reply-To: d...@kafka.apache.org
>>>>>>>>>>> 
>>>>>>>>>>> That not what I meant by "huge impact".
>>>>>>>>>>> 
>>>>>>>>>>> I refer to the actions related to materialize a KTable: creating a
>>>>>>>>>>> RocksDB store and a changelog topic -- users should be aware about
>>>>>>>>>>> runtime implication and this is better expressed by an explicit
>>>>>>>>>>> method
>>>>>>>>>>> call, rather than implicitly triggered by using a different
>>>>>>>>>>> overload of
>>>>>>>>>>> a method.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -Matthias
>>>>>>>>>>> 
>>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote:
>>>>>>>>>>> 
>>>>>>>>>>> I think your definition of a huge impact and mine are rather
>>>>>>>>>>>> different
>>>>>>>>>>>> ;-P
>>>>>>>>>>>> Overloading a few methods  is not really a huge impact IMO. It is
>>>>>>>>>>>> 
>>>>>>>>>>> also a
>>>>>>>> 
>>>>>>>>> sacrifice worth making for readability, usability of the API.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <
>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> I understand your argument, but do not agree with it.
>>>>>>>>>>>> 
>>>>>>>>>>>>> Your first version (even if the "flow" is not as nice) is more
>>>>>>>>>>>>> 
>>>>>>>>>>>> explicit
>>>>>>>> 
>>>>>>>>> than the second version. Adding a stateStoreName parameter is quite
>>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer the rather more
>>>>>>>>>>>>> verbose
>>>>>>>>>>>>> but explicit version.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts the flow,
>>>>>>>>>>>>>> i.e,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> table.mapValue(..).materialize().join(..).materialize()
>>>>>>>>>>>>>> compared to:
>>>>>>>>>>>>>> table.mapValues(..).join(..)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I know which one i prefer.
>>>>>>>>>>>>>> My preference is stil to provide overloaded methods where
>>>>>>>>>>>>>> people can
>>>>>>>>>>>>>> specify the store names if they want, otherwise we just
>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> them.
>>>>>>>> 
>>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax
>>>>>>>>>>>>>> <matth...@confluent.io
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing store name from
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>> methods and generate internal names (however, I would do this
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> overloads). Furthermore, I would not force users to call
>>>>>>>>>>>>>>> .materialize()
>>>>>>>>>>>>>>> if they want to query a store, but add one more method
>>>>>>>>>>>>>>> .stateStoreName()
>>>>>>>>>>>>>>> that returns the store name if the KTable is materialized.
>>>>>>>>>>>>>>> Thus,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> also
>>>>>>>> 
>>>>>>>>> .materialize() must not necessarily have a parameter storeName
>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> should have some overloads here).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I would also not allow to provide a null store name (to
>>>>>>>>>>>>>>> indicate no
>>>>>>>>>>>>>>> materialization if not necessary) but throw an exception.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> This yields some simplification (see below).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about KStream#toTable()
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 3)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>    3. What will happen when you call materialize on KTable
>>>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>    materialized? Will it create another StateStore
>>>>>>>>>>>>>>>> (providing
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>    different), throw an Exception?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there is no need to
>>>>>>>>>>>>>>>> worry
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>> a second materialization and also no exception must be
>>>>>>>>>>>>>>> throws. A
>>>>>>>>>>>>>>> call to
>>>>>>>>>>>>>>> .materialize() basically sets a "materialized flag" (ie,
>>>>>>>>>>>>>>> idempotent
>>>>>>>>>>>>>>> operation) and sets a new name.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 4)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use
>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> example,
>>>>>>>> 
>>>>>>>>> and
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> don't care about the "K" prefix.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Eno's reply:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it absolutely
>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> we are converting it to.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder methods
>>>>>>>>>>>>>>>> (but
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> this KIP).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I would keep #toStream(). (see below)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 5) We should not remove any methods but only deprecate them.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> A general note:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I do not understand your comments "Rejected Alternatives". You
>>>>>>>>>>>>>>> say
>>>>>>>>>>>>>>> "Have
>>>>>>>>>>>>>>> the KTable be the materialized view" was rejected. But your
>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of KTable is
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> secondary
>>>>>>>> 
>>>>>>>>> after those changes and the "view" abstraction is what a
>>>>>>>>>>>>>>> KTable is.
>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>> just to be clear, I like this a lot:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - it aligns with the name KTable
>>>>>>>>>>>>>>> - is aligns with stream-table-duality
>>>>>>>>>>>>>>> - it aligns with IQ
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction" (as
>>>>>>>>>>>>>>> materialization is
>>>>>>>>>>>>>>> optional).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta comments and a few
>>>>>>>>>>>>>>>> detailed
>>>>>>>>>>>>>>>> comments:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1. I like the materialize() function in general, but I would
>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> to
>>>>>>>> 
>>>>>>>>> see
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> how other KTable functions should be updated accordingly. For
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> example,
>>>>>>>> 
>>>>>>>>> 1)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name parameter, and
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> will
>>>>>>>> 
>>>>>>>>> always materialize the KTable unless its state store name is set
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> to
>>>>>>>> 
>>>>>>>>> null;
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be materialized,
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> hence
>>>>>>>> 
>>>>>>>>> it
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> also have a state store name; 3) KTable.join requires the
>>>>>>>>>>>>>> joining
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> be materialized. And today we do not actually have a
>>>>>>>>>>>>>>>> mechanism to
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that, but will only throw an exception at runtime if it is not
>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE will be
>>>>>>>>>>>>>>>> thrown).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off the discussion
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> here:
>>>>>>>> 
>>>>>>>>> let's
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> remove all the state store params in other KTable functions,
>>>>>>>>>>>>>> and if
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> in
>>>>>>>> 
>>>>>>>>> some
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable resulted
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> KXX.agg)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> and users do not call materialize(), then we treat it as "users
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> interested in querying it at all" and hence use an internal
>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> generated
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is materialized
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> store is not exposed to users. And if users call
>>>>>>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> afterwards
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> but we have already decided to materialize it, we can replace
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> name with the user's provided names. Then from a user's
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> point-view,
>>>>>>>> 
>>>>>>>>> if
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> ever want to query a KTable, they have to call materialize()
>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a
>>>>>>>> 
>>>>>>>>> given
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> state store name. This approach has one awkwardness though,
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> serdes
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> state store names param are not separated and could be
>>>>>>>>>>>>>>>> overlapped
>>>>>>>>>>>>>>>> (see
>>>>>>>>>>>>>>>> detailed comment #2 below).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 2. This step does not need to be included in this KIP, but
>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> as a
>>>>>>>> 
>>>>>>>>> reference / future work: as we have discussed before, we may
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> enforce
>>>>>>>> 
>>>>>>>>> materialize KTable.join resulted KTables as well in the
>>>>>>>>>>>>>>>> future. If
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that, then:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always materialized;
>>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to always be
>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>> (otherwise we would not know the old value);
>>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always materialized, and
>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> are
>>>>>>>> 
>>>>>>>>> the
>>>>>>>>>>>>>>>> joining KTables to always be materialized.
>>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables materialization
>>>>>>>>>>>>>>>> depend
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> parent's materialization;
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> By recursive induction all KTables are actually always
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> materialized,
>>>>>>>> 
>>>>>>>>> and
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> then the effect of the "materialize()" is just for specifying
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> store names. In this scenario, we do not need to send
>>>>>>>>>>>>>>>> Change<V> in
>>>>>>>>>>>>>>>> repartition topics within joins any more, but only for
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> repartitions
>>>>>>>> 
>>>>>>>>> topics
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> within aggregations. Instead, we can just send a "tombstone"
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> without
>>>>>>>> 
>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> old value and we do not need to calculate joins twice (one more
>>>>>>>>>>>>>> time
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> old value is received).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a
>>>>>>>>>>>>>>>> "KStream#toTable()"
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> function
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where the new
>>>>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of use cases of
>>>>>>>>>>>>>>>> this,
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> example, users want to read a changelog topic, apply some
>>>>>>>>>>>>>>>> filters,
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> materialize it into a KTable with state stores without
>>>>>>>>>>>>>>>> creating
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> duplicated
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> changelog topics. With materialize() and toTable I'd imagine
>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> specify sth. like:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>> KStream stream = builder.stream("topic1").filter(..);
>>>>>>>>>>>>>>>> KTable table = stream.toTable(..);
>>>>>>>>>>>>>>>> table.materialize("state1");
>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> And the library in this case could set store "state1" 's
>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly while
>>>>>>>>>>>>>>>> (re-)storing
>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>> state by reading from this topic, instead of creating a
>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a semi-duplicate
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> "topic1".
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Detailed:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was
>>>>>>>>>>>>>>>> thinking
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> renaming to "#toChangeLog" but after thinking a bit more I
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> #toStream
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> is still better, and we can just mention in the javaDoc that
>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>> transforming its underlying changelog stream to a normal
>>>>>>>>>>>>>>>> stream.
>>>>>>>>>>>>>>>> 2. As Damian mentioned, there are a few scenarios where the
>>>>>>>>>>>>>>>> serdes
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> already specified in a previous operation whereas it is not
>>>>>>>>>>>>>>>> known
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> calling materialize, for example:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> table.mapValues(/*no
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> serde specified*/).materialize(serde). We need to specify
>>>>>>>>>>>>>> what are
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> handling logic here.
>>>>>>>>>>>>>>>> 3. We can remove "KTable#to" call as well, and enforce users
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> call "
>>>>>>>>>>>>>>>> KTable.toStream.to" to be more clear.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <
>>>>>>>>>>>>>>>> eno.there...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it absolutely
>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> are converting it to.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder methods
>>>>>>>>>>>>>>>>> (but
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> this KIP).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <
>>>>>>>>>>>>>>>>> mich...@confluent.io>
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use
>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for
>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> don't care about the "K" prefix.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> eno.there...@gmail.com
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks Damian, answers inline:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <
>>>>>>>>>>>>>>>>>>> damian....@gmail.com>
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Eno,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. Some comments:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 1. I'd probably rename materialized to materialize.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Ok.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 2. I don't think the addition of the new Log compaction
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> mechanism
>>>>>>>> 
>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> necessary for this KIP, i.e, the KIP is useful without it.
>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> should be a different KIP?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Agreed, already removed. Will do a separate KIP for that.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on KTable
>>>>>>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore (providing
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> different), throw an Exception?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 4. Have you considered overloading the existing KTable
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> operations
>>>>>>>> 
>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a state store name? So if a state store name is provided,
>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> materialize
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> a state store? This would be my preferred approach as i
>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> materialize is always a valid operation.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Ok I can see your point. This will increase the KIP size
>>>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>> I'll
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> to enumerate all overloaded methods, but it's not a problem.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 5. The materialize method will need ta value Serde as some
>>>>>>>>>>>>>>>>>>> operations,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> i.e., mapValues, join etc can change the value types
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might
>>>>>>>>>>>>>>>>>>>> mean
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> always need to materialize the StateStore for KTable-KTable
>>>>>>>>>>>>>>>>>>>> joins.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> is the case, then the KTable Join operators will also need
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Serde
>>>>>>>> 
>>>>>>>>> information.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I'll update the KIP with the serdes.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <
>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com>
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> We created "KIP-114: KTable materialization and improved
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> semantics"
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> solidify the KTable semantics in Kafka Streams:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics
>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics
>>>>>>>>>>>>>>>>>>>> Your feedback is appreciated.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>> 
> 
> 
> -- 
> -- Guozhang


Reply via email to