Hi Damian,

Thanks. I agree, I'll adjust the tone so it's more about querying, while 
materialisation is an internal concept. 
If no store name is provided, the user would still be able to discover the 
store, however we are not making any strong guarantees in that case, since 
after all it i an internal decision on whether the names are discoverable. But 
yes, today you can discover them.

I think for V1 I'll go with actual materialization of the stores. We can then 
add a "viewer" option, i.e., not materialize, just compute on the fly. I can't 
help but think that down the line these two options will be considered by some 
sort of query optimizer that picks the best one. So I think both options will 
be needed, but starting the implementation with the materialization one.


> On 11 Apr 2017, at 00:14, Damian Guy <damian....@gmail.com> wrote:
> Hi Eno,
> Thanks for the update. I agree with what Matthias said. I wonder if the KIP
> should talk less about materialization and more about querying? After all,
> that is what is being provided from an end-users perspective.
> I think if no store name is provided users would still be able to query the
> store, just the store name would be some internally generated name. They
> would be able to discover those names via the IQ API
> I think for some stores it will make sense to not create a physical store,
> i.e., for thinks like `filter`, as this will save the rocksdb overhead. But
> i guess that is more of an implementation detail.
> Cheers,
> Damian
> On Tue, 11 Apr 2017 at 00:36 Eno Thereska <eno.there...@gmail.com> wrote:
>> Hi Matthias,
>>> However, this still forces users, to provide a name for store that we
>>> must materialize, even if users are not interested in querying the
>>> stores. Thus, I would like to have overloads for all currently existing
>>> methods having mandatory storeName paremeter, with overloads, that do
>>> not require the storeName parameter.
>> Oh yeah, absolutely, this is part of the KIP. I guess I didn't make it
>> clear, I'll clarify.
>> Thanks
>> Eno
>>> On 10 Apr 2017, at 16:00, Matthias J. Sax <matth...@confluent.io> wrote:
>>> Thanks for pushing this KIP Eno.
>>> The update give a very clear description about the scope, that is super
>>> helpful for the discussion!
>>> - To put it into my own words, the KIP focus is on enable to query all
>>> KTables.
>>>  ** The ability to query a store is determined by providing a name for
>>> the store.
>>>  ** At the same time, providing a name -- and thus making a store
>>> queryable -- does not say anything about an actual materialization (ie,
>>> being queryable and being materialized are orthogonal).
>>> I like this overall a lot. However, I would go one step further. Right
>>> now, you suggest to add new overload methods that allow users to specify
>>> a storeName -- if `null` is provided and the store is not materialized,
>>> we ignore it completely -- if `null` is provided but the store must be
>>> materialized we generate a internal name. So far so good.
>>> However, this still forces users, to provide a name for store that we
>>> must materialize, even if users are not interested in querying the
>>> stores. Thus, I would like to have overloads for all currently existing
>>> methods having mandatory storeName paremeter, with overloads, that do
>>> not require the storeName parameter.
>>> Otherwise, we would still have some methods which optional storeName
>>> parameter and other method with mandatory storeName parameter -- thus,
>>> still some inconsistency.
>>> -Matthias
>>> On 4/9/17 8:35 AM, Eno Thereska wrote:
>>>> Hi there,
>>>> I've now done a V2 of the KIP, that hopefully addresses the feedback in
>> this discussion thread:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+materialization+and+improved+semantics
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-114:+KTable+materialization+and+improved+semantics>.
>> Notable changes:
>>>> - clearly outline what is in the scope of the KIP and what is not. We
>> ran into the issue where lots of useful, but somewhat tangential
>> discussions came up on interactive queries, declarative DSL etc. The exact
>> scope of this KIP is spelled out.
>>>> - decided to go with overloaded methods, not .materialize(), to stay
>> within the spirit of the current declarative DSL.
>>>> - clarified the depreciation plan
>>>> - listed part of the discussion we had under rejected alternatives
>>>> If you have any further feedback on this, let's continue on this thread.
>>>> Thank you
>>>> Eno
>>>>> On 1 Feb 2017, at 09:04, Eno Thereska <eno.there...@gmail.com> wrote:
>>>>> Thanks everyone! I think it's time to do a V2 on the KIP so I'll do
>> that and we can see how it looks and continue the discussion from there.
>> Stay tuned.
>>>>> Thanks
>>>>> Eno
>>>>>> On 30 Jan 2017, at 17:23, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>>>>> Hi,
>>>>>> I think Eno's separation is very clear and helpful. In order to
>>>>>> streamline this discussion, I would suggest we focus back on point (1)
>>>>>> only, as this is the original KIP question.
>>>>>> Even if I started to DSL design discussion somehow, because I thought
>> it
>>>>>> might be helpful to resolve both in a single shot, I feel that we have
>>>>>> too many options about DSL design and we should split it up in two
>>>>>> steps. This will have the disadvantage that we will change the API
>>>>>> twice, but still, I think it will be a more focused discussion.
>>>>>> I just had another look at the KIP, an it proposes 3 changes:
>>>>>> 1. add .materialized() -> IIRC it was suggested to name this
>>>>>> .materialize() though (can you maybe update the KIP Eno?)
>>>>>> 2. remove print(), writeAsText(), and foreach()
>>>>>> 3. rename toStream() to toKStream()
>>>>>> I completely agree with (2) -- not sure about (3) though because
>>>>>> KStreamBuilder also hast .stream() and .table() as methods.
>>>>>> However, we might want to introduce a KStream#toTable() -- this was
>>>>>> requested multiple times -- might also be part of a different KIP.
>>>>>> Thus, we end up with (1). I would suggest to do a step backward here
>> and
>>>>>> instead of a discussion how to express the changes in the DSL (new
>>>>>> overload, new methods...) we should discuss what the actual change
>>>>>> should be. Like (1) materialize all KTable all the time (2) all the
>> user
>>>>>> to force a materialization to enable querying the KTable (3) allow for
>>>>>> queryable non-materialized KTable.
>>>>>> On more question is, if we want to allow a user-forced materialization
>>>>>> only as as local store without changelog, or both (together /
>>>>>> independently)? We got some request like this already.
>>>>>> -Matthias
>>>>>> On 1/30/17 3:50 AM, Jan Filipiak wrote:
>>>>>>> Hi Eno,
>>>>>>> thanks for putting into different points. I want to put a few remarks
>>>>>>> inline.
>>>>>>> Best Jan
>>>>>>> On 30.01.2017 12:19, Eno Thereska wrote:
>>>>>>>> So I think there are several important discussion threads that are
>>>>>>>> emerging here. Let me try to tease them apart:
>>>>>>>> 1. inconsistency in what is materialized and what is not, what is
>>>>>>>> queryable and what is not. I think we all agree there is some
>>>>>>>> inconsistency there and this will be addressed with any of the
>>>>>>>> proposed approaches. Addressing the inconsistency is the point of
>> the
>>>>>>>> original KIP.
>>>>>>>> 2. the exact API for materializing a KTable. We can specify 1) a
>>>>>>>> "store name" (as we do today) or 2) have a ".materialize[d]" call or
>>>>>>>> 3) get a handle from a KTable ".getQueryHandle" or 4) have a builder
>>>>>>>> construct. So we have discussed 4 options. It is important to
>> remember
>>>>>>>> in this discussion that IQ is not designed for just local queries,
>> but
>>>>>>>> also for distributed queries. In all cases an identifying name/id is
>>>>>>>> needed for the store that the user is interested in querying. So we
>>>>>>>> end up with a discussion on who provides the name, the user (as done
>>>>>>>> today) or if it is generated automatically (as Jan suggests, as I
>>>>>>>> understand it). If it is generated automatically we need a way to
>>>>>>>> expose these auto-generated names to the users and link them to the
>>>>>>>> KTables they care to query.
>>>>>>> Hi, the last sentence is what I currently arguing against. The user
>>>>>>> would never see a stringtype indentifier name or anything. All he
>> gets
>>>>>>> is the queryHandle if he executes a get(K) that will be an
>> interactive
>>>>>>> query get. with all the finding the right servers that currently
>> have a
>>>>>>> copy of this underlying store stuff going on. The nice part is that
>> if
>>>>>>> someone retrieves a queryHandle, you know that you have to
>> materialized
>>>>>>> (if you are not already) as queries will be coming. Taking away the
>>>>>>> confusion mentioned in point 1 IMO.
>>>>>>>> 3. The exact boundary between the DSL, that is the processing
>>>>>>>> language, and the storage/IQ queries, and how we jump from one to
>> the
>>>>>>>> other. This is mostly for how we get a handle on a store (so it's
>>>>>>>> related to point 2), rather than for how we query the store. I think
>>>>>>>> we all agree that we don't want to limit ways one can query a store
>>>>>>>> (e.g., using gets or range queries etc) and the query APIs are not
>> in
>>>>>>>> the scope of the DSL.
>>>>>>> Does the IQ work with range currently? The range would have to be
>>>>>>> started on all stores and then merged by maybe the client. Range
>> force a
>>>>>>> flush to RocksDB currently so I am sure you would get a performance
>> hit
>>>>>>> right there. Time-windows might be okay, but I am not sure if the
>> first
>>>>>>> version should offer the user range access.
>>>>>>>> 4. The nature of the DSL and whether its declarative enough, or
>>>>>>>> flexible enough. Damian made the point that he likes the builder
>>>>>>>> pattern since users can specify, per KTable, things like caching and
>>>>>>>> logging needs. His observation (as I understand it) is that the
>>>>>>>> processor API (PAPI) is flexible but doesn't provide any help at all
>>>>>>>> to users. The current DSL provides declarative abstractions, but
>> it's
>>>>>>>> not fine-grained enough. This point is much broader than the KIP,
>> but
>>>>>>>> discussing it in this KIPs context is ok, since we don't want to
>> make
>>>>>>>> small piecemeal changes and then realise we're not in the spot we
>> want
>>>>>>>> to be.
>>>>>>> This is indeed much broader. My guess here is that's why both API's
>>>>>>> exists and helping the users to switch back and forth might be a
>> thing.
>>>>>>>> Feel free to pitch in if I have misinterpreted something.
>>>>>>>> Thanks
>>>>>>>> Eno
>>>>>>>>> On 30 Jan 2017, at 10:22, Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>>>>>>>>> Hi Eno,
>>>>>>>>> I have a really hard time understanding why we can't. From my point
>>>>>>>>> of view everything could be super elegant DSL only + public api for
>>>>>>>>> the PAPI-people as already exist.
>>>>>>>>> The above aproach implementing a .get(K) on KTable is foolisch in
>> my
>>>>>>>>> opinion as it would be to late to know that materialisation would
>> be
>>>>>>>>> required.
>>>>>>>>> But having an API that allows to indicate I want to query this
>> table
>>>>>>>>> and then wrapping the say table's processorname can work out really
>>>>>>>>> really nice. The only obstacle I see is people not willing to spend
>>>>>>>>> the additional time in implementation and just want a quick shot
>>>>>>>>> option to make it work.
>>>>>>>>> For me it would look like this:
>>>>>>>>> table =  builder.table()
>>>>>>>>> filteredTable = table.filter()
>>>>>>>>> rawHandle = table.getQueryHandle() // Does the materialisation,
>>>>>>>>> really all names possible but id rather hide the implication of it
>>>>>>>>> materializes
>>>>>>>>> filteredTableHandle = filteredTable.getQueryHandle() // this would
>>>>>>>>> _not_ materialize again of course, the source or the aggregator
>> would
>>>>>>>>> stay the only materialized processors
>>>>>>>>> streams = new streams(builder)
>>>>>>>>> This middle part is highly flexible I could imagin to force the
>> user
>>>>>>>>> todo something like this. This implies to the user that his streams
>>>>>>>>> need to be running
>>>>>>>>> instead of propagating the missing initialisation back by
>> exceptions.
>>>>>>>>> Also if the users is forced to pass the appropriate streams
>> instance
>>>>>>>>> back can change.
>>>>>>>>> I think its possible to build multiple streams out of  one topology
>>>>>>>>> so it would be easiest to implement aswell. This is just what I
>> maybe
>>>>>>>>> had liked the most
>>>>>>>>> streams.start();
>>>>>>>>> rawHandle.prepare(streams)
>>>>>>>>> filteredHandle.prepare(streams)
>>>>>>>>> later the users can do
>>>>>>>>> V value = rawHandle.get(K)
>>>>>>>>> V value = filteredHandle.get(K)
>>>>>>>>> This could free DSL users from anything like storenames and how and
>>>>>>>>> what to materialize. Can someone indicate what the problem would be
>>>>>>>>> implementing it like this.
>>>>>>>>> Yes I am aware that the current IQ API will not support querying by
>>>>>>>>> KTableProcessorName instread of statestoreName. But I think that
>> had
>>>>>>>>> to change if you want it to be intuitive
>>>>>>>>> IMO you gotta apply the filter read time
>>>>>>>>> Looking forward to your opinions
>>>>>>>>> Best Jan
>>>>>>>>> #DeathToIQMoreAndBetterConnectors
>>>>>>>>> On 30.01.2017 10:42, Eno Thereska wrote:
>>>>>>>>>> Hi there,
>>>>>>>>>> The inconsistency will be resolved, whether with materialize or
>>>>>>>>>> overloaded methods.
>>>>>>>>>> With the discussion on the DSL & stores I feel we've gone in a
>>>>>>>>>> slightly different tangent, which is worth discussing nonetheless.
>>>>>>>>>> We have entered into an argument around the scope of the DSL. The
>>>>>>>>>> DSL has been designed primarily for processing. The DSL does not
>>>>>>>>>> dictate ways to access state stores or what hind of queries to
>>>>>>>>>> perform on them. Hence, I see the mechanism for accessing storage
>> as
>>>>>>>>>> decoupled from the DSL.
>>>>>>>>>> We could think of ways to get store handles from part of the DSL,
>>>>>>>>>> like the KTable abstraction. However, subsequent queries will be
>>>>>>>>>> store-dependent and not rely on the DSL, hence I'm not sure we get
>>>>>>>>>> any grand-convergence DSL-Store here. So I am arguing that the
>>>>>>>>>> current way of getting a handle on state stores is fine.
>>>>>>>>>> Thanks
>>>>>>>>>> Eno
>>>>>>>>>>> On 30 Jan 2017, at 03:56, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>>>>>>>>> Thinking loud here about the API options (materialize v.s.
>> overloaded
>>>>>>>>>>> functions) and its impact on IQ:
>>>>>>>>>>> 1. The first issue of the current DSL is that, there is
>>>>>>>>>>> inconsistency upon
>>>>>>>>>>> whether / how KTables should be materialized:
>>>>>>>>>>> a) in many cases the library HAS TO materialize KTables no
>>>>>>>>>>> matter what,
>>>>>>>>>>> e.g. KStream / KTable aggregation resulted KTables, and hence we
>>>>>>>>>>> enforce
>>>>>>>>>>> users to provide store names and throw RTE if it is null;
>>>>>>>>>>> b) in some other cases, the KTable can be materialized or not;
>> for
>>>>>>>>>>> example in KStreamBuilder.table(), store names can be nullable
>> and
>>>>>>>>>>> in which
>>>>>>>>>>> case the KTable would not be materialized;
>>>>>>>>>>> c) in some other cases, the KTable will never be materialized,
>> for
>>>>>>>>>>> example KTable.filter() resulted KTables, and users have no
>> options to
>>>>>>>>>>> enforce them to be materialized;
>>>>>>>>>>> d) this is related to a), where some KTables are required to be
>>>>>>>>>>> materialized, but we do not enforce users to provide a state
>> store
>>>>>>>>>>> name,
>>>>>>>>>>> e.g. KTables involved in joins; a RTE will be thrown not
>>>>>>>>>>> immediately but
>>>>>>>>>>> later in this case.
>>>>>>>>>>> 2. The second issue is related to IQ, where state stores are
>>>>>>>>>>> accessed by
>>>>>>>>>>> their state stores; so only those KTable's that have
>> user-specified
>>>>>>>>>>> state
>>>>>>>>>>> stores will be queryable. But because of 1) above, many stores
>> may
>>>>>>>>>>> not be
>>>>>>>>>>> interested to users for IQ but they still need to provide a
>>>>>>>>>>> (dummy?) state
>>>>>>>>>>> store name for them; while on the other hand users cannot query
>>>>>>>>>>> some state
>>>>>>>>>>> stores, e.g. the ones generated by KTable.filter() as there is no
>>>>>>>>>>> APIs for
>>>>>>>>>>> them to specify a state store name.
>>>>>>>>>>> 3. We are aware from user feedbacks that such backend details
>> would be
>>>>>>>>>>> better be abstracted away from the DSL layer, where app
>> developers
>>>>>>>>>>> should
>>>>>>>>>>> just focus on processing logic, while state stores along with
>> their
>>>>>>>>>>> changelogs etc would better be in a different mechanism; same
>>>>>>>>>>> arguments
>>>>>>>>>>> have been discussed for serdes / windowing triggers as well. For
>>>>>>>>>>> serdes
>>>>>>>>>>> specifically, we had a very long discussion about it and
>> concluded
>>>>>>>>>>> that, at
>>>>>>>>>>> least in Java7, we cannot completely abstract serde away in the
>>>>>>>>>>> DSL, so we
>>>>>>>>>>> choose the other extreme to enforce users to be completely aware
>> of
>>>>>>>>>>> the
>>>>>>>>>>> serde requirements when some KTables may need to be materialized
>> vis
>>>>>>>>>>> overloaded API functions. While for the state store names, I feel
>>>>>>>>>>> it is a
>>>>>>>>>>> different argument than serdes (details below).
>>>>>>>>>>> So to me, for either materialize() v.s. overloaded functions
>>>>>>>>>>> directions,
>>>>>>>>>>> the first thing I'd like to resolve is the inconsistency issue
>>>>>>>>>>> mentioned
>>>>>>>>>>> above. So in either case: KTable materialization will not be
>> affect
>>>>>>>>>>> by user
>>>>>>>>>>> providing state store name or not, but will only be decided by
>> the
>>>>>>>>>>> library
>>>>>>>>>>> when it is necessary. More specifically, only join operator and
>>>>>>>>>>> builder.table() resulted KTables are not always materialized, but
>>>>>>>>>>> are still
>>>>>>>>>>> likely to be materialized lazily (e.g. when participated in a
>> join
>>>>>>>>>>> operator).
>>>>>>>>>>> For overloaded functions that would mean:
>>>>>>>>>>> a) we have an overloaded function for ALL operators that could
>>>>>>>>>>> result
>>>>>>>>>>> in a KTable, and allow it to be null (i.e. for the function
>> without
>>>>>>>>>>> this
>>>>>>>>>>> param it is null by default);
>>>>>>>>>>> b) null-state-store-name do not indicate that a KTable would
>>>>>>>>>>> not be
>>>>>>>>>>> materialized, but that it will not be used for IQ at all
>> (internal
>>>>>>>>>>> state
>>>>>>>>>>> store names will be generated when necessary).
>>>>>>>>>>> For materialize() that would mean:
>>>>>>>>>>> a) we will remove state store names from ALL operators that
>> could
>>>>>>>>>>> result in a KTable.
>>>>>>>>>>> b) KTables that not calling materialized do not indicate that a
>>>>>>>>>>> KTable
>>>>>>>>>>> would not be materialized, but that it will not be used for IQ
>> at all
>>>>>>>>>>> (internal state store names will be generated when necessary).
>>>>>>>>>>> Again, in either ways the API itself does not "hint" about
>> anything
>>>>>>>>>>> for
>>>>>>>>>>> materializing a KTable or not at all; it is still purely
>> determined
>>>>>>>>>>> by the
>>>>>>>>>>> library when parsing the DSL for now.
>>>>>>>>>>> Following these thoughts, I feel that 1) we should probably
>> change
>>>>>>>>>>> the name
>>>>>>>>>>> "materialize" since it may be misleading to users as what
>> actually
>>>>>>>>>>> happened
>>>>>>>>>>> behind the scene, to e.g. Damian suggested "queryableStore(String
>>>>>>>>>>> storeName)",
>>>>>>>>>>> which returns a QueryableStateStore, and can replace the
>>>>>>>>>>> `KafkaStreams.store` function; 2) comparing those two options
>>>>>>>>>>> assuming we
>>>>>>>>>>> get rid of the misleading function name, I personally favor not
>>>>>>>>>>> adding more
>>>>>>>>>>> overloading functions as it keeps the API simpler.
>>>>>>>>>>> Guozhang
>>>>>>>>>>> On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak
>>>>>>>>>>> <jan.filip...@trivago.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> thanks for your mail, felt like this can clarify some things!
>> The
>>>>>>>>>>>> thread
>>>>>>>>>>>> unfortunately split but as all branches close in on what my
>>>>>>>>>>>> suggestion was
>>>>>>>>>>>> about Ill pick this to continue
>>>>>>>>>>>> Of course only the table the user wants to query would be
>>>>>>>>>>>> materialized.
>>>>>>>>>>>> (retrieving the queryhandle implies materialisation). So In the
>>>>>>>>>>>> example of
>>>>>>>>>>>> KTable::filter if you call
>>>>>>>>>>>> getIQHandle on both tables only the one source that is there
>> would
>>>>>>>>>>>> materialize and the QueryHandleabstraction would make sure it
>> gets
>>>>>>>>>>>> mapped
>>>>>>>>>>>> and filtered and what not uppon read as usual.
>>>>>>>>>>>> Of Course the Object you would retrieve would maybe only wrap
>> the
>>>>>>>>>>>> storeName / table unique identifier and a way to access the
>> streams
>>>>>>>>>>>> instance and then basically uses the same mechanism that is
>>>>>>>>>>>> currently used.
>>>>>>>>>>>> From my point of view this is the least confusing way for DSL
>>>>>>>>>>>> users. If
>>>>>>>>>>>> its to tricky to get a hand on the streams instance one could
>> ask
>>>>>>>>>>>> the user
>>>>>>>>>>>> to pass it in before executing queries, therefore making sure
>> the
>>>>>>>>>>>> streams
>>>>>>>>>>>> instance has been build.
>>>>>>>>>>>> The effort to implement this is indeed some orders of magnitude
>>>>>>>>>>>> higher
>>>>>>>>>>>> than the overloaded materialized call. As long as I could help
>>>>>>>>>>>> getting a
>>>>>>>>>>>> different view I am happy.
>>>>>>>>>>>> Best Jan
>>>>>>>>>>>> On 28.01.2017 09:36, Eno Thereska wrote:
>>>>>>>>>>>>> Hi Jan,
>>>>>>>>>>>>> I understand your concern. One implication of not passing any
>>>>>>>>>>>>> store name
>>>>>>>>>>>>> and just getting an IQ handle is that all KTables would need
>> to be
>>>>>>>>>>>>> materialised. Currently the store name (or proposed
>>>>>>>>>>>>> .materialize() call)
>>>>>>>>>>>>> act as hints on whether to materialise the KTable or not.
>>>>>>>>>>>>> Materialising
>>>>>>>>>>>>> every KTable can be expensive, although there are some tricks
>> one
>>>>>>>>>>>>> can play,
>>>>>>>>>>>>> e.g., have a virtual store rather than one backed by a Kafka
>> topic.
>>>>>>>>>>>>> However, even with the above, after getting an IQ handle, the
>>>>>>>>>>>>> user would
>>>>>>>>>>>>> still need to use IQ APIs to query the state. As such, we would
>>>>>>>>>>>>> still
>>>>>>>>>>>>> continue to be outside the original DSL so this wouldn't
>> address
>>>>>>>>>>>>> your
>>>>>>>>>>>>> original concern.
>>>>>>>>>>>>> So I read this suggestion as simplifying the APIs by removing
>> the
>>>>>>>>>>>>> store
>>>>>>>>>>>>> name, at the cost of having to materialise every KTable. It's
>>>>>>>>>>>>> definitely an
>>>>>>>>>>>>> option we'll consider as part of this KIP.
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Eno
>>>>>>>>>>>>> On 28 Jan 2017, at 06:49, Jan Filipiak <
>> jan.filip...@trivago.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi Exactly
>>>>>>>>>>>>>> I know it works from the Processor API, but my suggestion
>> would
>>>>>>>>>>>>>> prevent
>>>>>>>>>>>>>> DSL users dealing with storenames what so ever.
>>>>>>>>>>>>>> In general I am pro switching between DSL and Processor API
>>>>>>>>>>>>>> easily. (In
>>>>>>>>>>>>>> my Stream applications I do this a lot with reflection and
>>>>>>>>>>>>>> instanciating
>>>>>>>>>>>>>> KTableImpl) Concerning this KIP all I say is that there should
>>>>>>>>>>>>>> be a DSL
>>>>>>>>>>>>>> concept of "I want to expose this __KTable__. This can be a
>>>>>>>>>>>>>> Method like
>>>>>>>>>>>>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the table
>>>>>>>>>>>>>> would know
>>>>>>>>>>>>>> to materialize, and the user had a reference to the "store
>> and the
>>>>>>>>>>>>>> distributed query mechanism by the Interactive Query Handle"
>>>>>>>>>>>>>> under the hood
>>>>>>>>>>>>>> it can use the same mechanism as the PIP people again.
>>>>>>>>>>>>>> I hope you see my point J
>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :)
>>>>>>>>>>>>>> On 27.01.2017 21:59, Matthias J. Sax wrote:
>>>>>>>>>>>>>>> Jan,
>>>>>>>>>>>>>>> the IQ feature is not limited to Streams DSL but can also be
>>>>>>>>>>>>>>> used for
>>>>>>>>>>>>>>> Stores used in PAPI. Thus, we need a mechanism that does work
>>>>>>>>>>>>>>> for PAPI
>>>>>>>>>>>>>>> and DSL.
>>>>>>>>>>>>>>> Nevertheless I see your point and I think we could provide a
>>>>>>>>>>>>>>> better API
>>>>>>>>>>>>>>> for KTable stores including the discovery of remote shards of
>>>>>>>>>>>>>>> the same
>>>>>>>>>>>>>>> KTable.
>>>>>>>>>>>>>>> @Michael: Yes, right now we do have a lot of overloads and I
>> am
>>>>>>>>>>>>>>> not a
>>>>>>>>>>>>>>> big fan of those -- I would rather prefer a builder pattern.
>>>>>>>>>>>>>>> But that
>>>>>>>>>>>>>>> might be a different discussion (nevertheless, if we would
>> aim
>>>>>>>>>>>>>>> for a API
>>>>>>>>>>>>>>> rework, we should get the changes with regard to stores right
>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>> beginning on, in order to avoid a redesign later on.)
>>>>>>>>>>>>>>> something like:
>>>>>>>>>>>>>>> stream.groupyByKey()
>>>>>>>>>>>>>>>    .window(TimeWindow.of(5000))
>>>>>>>>>>>>>>>    .aggregate(...)
>>>>>>>>>>>>>>>    .withAggValueSerde(new CustomTypeSerde())
>>>>>>>>>>>>>>>    .withStoreName("storeName);
>>>>>>>>>>>>>>> (This would also reduce JavaDoc redundancy -- maybe a
>> personal
>>>>>>>>>>>>>>> pain
>>>>>>>>>>>>>>> point right now :))
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote:
>>>>>>>>>>>>>>>> Yeah,
>>>>>>>>>>>>>>>> Maybe my bad that I refuse to look into IQ as i don't find
>> them
>>>>>>>>>>>>>>>> anywhere
>>>>>>>>>>>>>>>> close to being interesting. The Problem IMO is that people
>>>>>>>>>>>>>>>> need to know
>>>>>>>>>>>>>>>> the Store name), so we are working on different levels to
>>>>>>>>>>>>>>>> achieve a
>>>>>>>>>>>>>>>> single goal.
>>>>>>>>>>>>>>>> What is your peoples opinion on having a method on KTABLE
>> that
>>>>>>>>>>>>>>>> returns
>>>>>>>>>>>>>>>> them something like a Keyvalue store. There is of course
>>>>>>>>>>>>>>>> problems like
>>>>>>>>>>>>>>>> "it cant be used before the streamthreads are going and
>>>>>>>>>>>>>>>> groupmembership
>>>>>>>>>>>>>>>> is established..." but the benefit would be that for the
>> user
>>>>>>>>>>>>>>>> there is
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> consistent way of saying "Hey I need it materialized as
>>>>>>>>>>>>>>>> querries gonna
>>>>>>>>>>>>>>>> be comming" + already get a Thing that he can execute the
>>>>>>>>>>>>>>>> querries on
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> 1 step.
>>>>>>>>>>>>>>>> What I think is unintuitive here is you need to say
>>>>>>>>>>>>>>>> materialize on this
>>>>>>>>>>>>>>>> Ktable and then you go somewhere else and find its store
>> name
>>>>>>>>>>>>>>>> and then
>>>>>>>>>>>>>>>> you go to the kafkastreams instance and ask for the store
>> with
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> name.
>>>>>>>>>>>>>>>> So one could the user help to stay in DSL land and therefore
>>>>>>>>>>>>>>>> maybe
>>>>>>>>>>>>>>>> confuse him less.
>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :)
>>>>>>>>>>>>>>>> On 27.01.2017 16:51, Damian Guy wrote:
>>>>>>>>>>>>>>>>> I think Jan is saying that they don't always need to be
>>>>>>>>>>>>>>>>> materialized,
>>>>>>>>>>>>>>>>> i.e.,
>>>>>>>>>>>>>>>>> filter just needs to apply the ValueGetter, it doesn't
>> need yet
>>>>>>>>>>>>>>>>> another
>>>>>>>>>>>>>>>>> physical state store.
>>>>>>>>>>>>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll <
>> mich...@confluent.io>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Like Damian, and for the same reasons, I am more in favor
>> of
>>>>>>>>>>>>>>>>>> overloading
>>>>>>>>>>>>>>>>>> methods rather than introducing `materialize()`.
>>>>>>>>>>>>>>>>>> FWIW, we already have a similar API setup for e.g.
>>>>>>>>>>>>>>>>>> `KTable#through(topicName, stateStoreName)`.
>>>>>>>>>>>>>>>>>> A related but slightly different question is what e.g. Jan
>>>>>>>>>>>>>>>>>> Filipiak
>>>>>>>>>>>>>>>>>> mentioned earlier in this thread:
>>>>>>>>>>>>>>>>>> I think we need to explain more clearly why KIP-114
>> doesn't
>>>>>>>>>>>>>>>>>> propose
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> seemingly simpler solution of always materializing
>> tables/state
>>>>>>>>>>>>>>>>>> stores.
>>>>>>>>>>>>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <
>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? If
>>>>>>>>>>>>>>>>>>> you uses
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> ValueGetter of Filter it will apply the filter and
>> should be
>>>>>>>>>>>>>>>>>>> completely
>>>>>>>>>>>>>>>>>>> transparent as to if another processor or IQ is accessing
>>>>>>>>>>>>>>>>>>> it? How
>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> new method help?
>>>>>>>>>>>>>>>>>>> I cannot see the reason for the additional materialize
>>>>>>>>>>>>>>>>>>> method being
>>>>>>>>>>>>>>>>>>> required! Hence I suggest leave it alone.
>>>>>>>>>>>>>>>>>>> regarding removing the others I dont have strong opinions
>>>>>>>>>>>>>>>>>>> and it
>>>>>>>>>>>>>>>>>>> seems to
>>>>>>>>>>>>>>>>>>> be unrelated.
>>>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote:
>>>>>>>>>>>>>>>>>>> Forwarding this thread to the users list too in case
>> people
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> comment. It is also on the dev list.
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>>> Begin forwarded message:
>>>>>>>>>>>>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable materialization
>> and
>>>>>>>>>>>>>>>>>>>>> improved
>>>>>>>>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT
>>>>>>>>>>>>>>>>>>>>> To: dev@kafka.apache.org
>>>>>>>>>>>>>>>>>>>>> Reply-To: dev@kafka.apache.org
>>>>>>>>>>>>>>>>>>>>> That not what I meant by "huge impact".
>>>>>>>>>>>>>>>>>>>>> I refer to the actions related to materialize a KTable:
>>>>>>>>>>>>>>>>>>>>> creating a
>>>>>>>>>>>>>>>>>>>>> RocksDB store and a changelog topic -- users should be
>>>>>>>>>>>>>>>>>>>>> aware about
>>>>>>>>>>>>>>>>>>>>> runtime implication and this is better expressed by an
>>>>>>>>>>>>>>>>>>>>> explicit
>>>>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>>> call, rather than implicitly triggered by using a
>> different
>>>>>>>>>>>>>>>>>>>>> overload of
>>>>>>>>>>>>>>>>>>>>> a method.
>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote:
>>>>>>>>>>>>>>>>>>>>> I think your definition of a huge impact and mine are
>> rather
>>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>> ;-P
>>>>>>>>>>>>>>>>>>>>>> Overloading a few methods  is not really a huge impact
>>>>>>>>>>>>>>>>>>>>>> IMO. It is
>>>>>>>>>>>>>>>>>>>>> also a
>>>>>>>>>>>>>>>>>>> sacrifice worth making for readability, usability of the
>> API.
>>>>>>>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <
>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> I understand your argument, but do not agree with it.
>>>>>>>>>>>>>>>>>>>>>>> Your first version (even if the "flow" is not as
>> nice)
>>>>>>>>>>>>>>>>>>>>>>> is more
>>>>>>>>>>>>>>>>>>>>>> explicit
>>>>>>>>>>>>>>>>>>> than the second version. Adding a stateStoreName
>> parameter
>>>>>>>>>>>>>>>>>>> is quite
>>>>>>>>>>>>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer the
>>>>>>>>>>>>>>>>>>>>>>> rather more
>>>>>>>>>>>>>>>>>>>>>>> verbose
>>>>>>>>>>>>>>>>>>>>>>> but explicit version.
>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote:
>>>>>>>>>>>>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts
>> the
>>>>>>>>>>>>>>>>>>>>>>> flow,
>>>>>>>>>>>>>>>>>>>>>>>> i.e,
>> table.mapValue(..).materialize().join(..).materialize()
>>>>>>>>>>>>>>>>>>>>>>>> compared to:
>>>>>>>>>>>>>>>>>>>>>>>> table.mapValues(..).join(..)
>>>>>>>>>>>>>>>>>>>>>>>> I know which one i prefer.
>>>>>>>>>>>>>>>>>>>>>>>> My preference is stil to provide overloaded methods
>> where
>>>>>>>>>>>>>>>>>>>>>>>> people can
>>>>>>>>>>>>>>>>>>>>>>>> specify the store names if they want, otherwise we
>> just
>>>>>>>>>>>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax
>>>>>>>>>>>>>>>>>>>>>>>> <matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents:
>>>>>>>>>>>>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing store
>>>>>>>>>>>>>>>>>>>>>>>>> name from
>>>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>>>>>>>>>>> methods and generate internal names (however, I
>> would
>>>>>>>>>>>>>>>>>>>>>>>>> do this
>>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>> overloads). Furthermore, I would not force users
>> to call
>>>>>>>>>>>>>>>>>>>>>>>>> .materialize()
>>>>>>>>>>>>>>>>>>>>>>>>> if they want to query a store, but add one more
>> method
>>>>>>>>>>>>>>>>>>>>>>>>> .stateStoreName()
>>>>>>>>>>>>>>>>>>>>>>>>> that returns the store name if the KTable is
>>>>>>>>>>>>>>>>>>>>>>>>> materialized.
>>>>>>>>>>>>>>>>>>>>>>>>> Thus,
>>>>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>> .materialize() must not necessarily have a parameter
>> storeName
>>>>>>>>>>>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>> should have some overloads here).
>>>>>>>>>>>>>>>>>>>>>>>>> I would also not allow to provide a null store
>> name (to
>>>>>>>>>>>>>>>>>>>>>>>>> indicate no
>>>>>>>>>>>>>>>>>>>>>>>>> materialization if not necessary) but throw an
>>>>>>>>>>>>>>>>>>>>>>>>> exception.
>>>>>>>>>>>>>>>>>>>>>>>>> This yields some simplification (see below).
>>>>>>>>>>>>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about
>>>>>>>>>>>>>>>>>>>>>>>>> KStream#toTable()
>>>>>>>>>>>>>>>>>>>>>>>>> 3)
>>>>>>>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on
>>>>>>>>>>>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore
>>>>>>>>>>>>>>>>>>>>>>>>>> (providing
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> different), throw an Exception?
>>>>>>>>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>>>>>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there is
>> no
>>>>>>>>>>>>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>>>>>>> worry
>>>>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>> a second materialization and also no exception
>> must be
>>>>>>>>>>>>>>>>>>>>>>>>> throws. A
>>>>>>>>>>>>>>>>>>>>>>>>> call to
>>>>>>>>>>>>>>>>>>>>>>>>> .materialize() basically sets a "materialized
>> flag" (ie,
>>>>>>>>>>>>>>>>>>>>>>>>> idempotent
>>>>>>>>>>>>>>>>>>>>>>>>> operation) and sets a new name.
>>>>>>>>>>>>>>>>>>>>>>>>> 4)
>>>>>>>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also
>> use
>>>>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and
>>>>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#table()`, for
>>>>>>>>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> don't care about the "K" prefix.
>>>>>>>>>>>>>>>>>>>>>>>>> Eno's reply:
>>>>>>>>>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it
>>>>>>>>>>>>>>>>>>>>>>>>> absolutely
>>>>>>>>>>>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>>>>>>> we are converting it to.
>>>>>>>>>>>>>>>>>>>>>>>>> I'd say we should probably change the
>> KStreamBuilder
>>>>>>>>>>>>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>>>>>>>>>>>> (but
>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> this KIP).
>>>>>>>>>>>>>>>>>>>>>>>>> I would keep #toStream(). (see below)
>>>>>>>>>>>>>>>>>>>>>>>>> 5) We should not remove any methods but only
>>>>>>>>>>>>>>>>>>>>>>>>> deprecate them.
>>>>>>>>>>>>>>>>>>>>>>>>> A general note:
>>>>>>>>>>>>>>>>>>>>>>>>> I do not understand your comments "Rejected
>>>>>>>>>>>>>>>>>>>>>>>>> Alternatives". You
>>>>>>>>>>>>>>>>>>>>>>>>> say
>>>>>>>>>>>>>>>>>>>>>>>>> "Have
>>>>>>>>>>>>>>>>>>>>>>>>> the KTable be the materialized view" was rejected.
>>>>>>>>>>>>>>>>>>>>>>>>> But your
>>>>>>>>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of
>>>>>>>>>>>>>>>>>>>>>>>>> KTable is
>>>>>>>>>>>>>>>>>>>>>>>> secondary
>>>>>>>>>>>>>>>>>>> after those changes and the "view" abstraction is what a
>>>>>>>>>>>>>>>>>>>>>>>>> KTable is.
>>>>>>>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>>>>>> just to be clear, I like this a lot:
>>>>>>>>>>>>>>>>>>>>>>>>> - it aligns with the name KTable
>>>>>>>>>>>>>>>>>>>>>>>>> - is aligns with stream-table-duality
>>>>>>>>>>>>>>>>>>>>>>>>> - it aligns with IQ
>>>>>>>>>>>>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction"
>> (as
>>>>>>>>>>>>>>>>>>>>>>>>> materialization is
>>>>>>>>>>>>>>>>>>>>>>>>> optional).
>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta comments
>>>>>>>>>>>>>>>>>>>>>>>>> and a few
>>>>>>>>>>>>>>>>>>>>>>>>>> detailed
>>>>>>>>>>>>>>>>>>>>>>>>>> comments:
>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I like the materialize() function in general,
>> but
>>>>>>>>>>>>>>>>>>>>>>>>>> I would
>>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>>>>>>>>> how other KTable functions should be updated
>>>>>>>>>>>>>>>>>>>>>>>> accordingly. For
>>>>>>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>> 1)
>>>>>>>>>>>>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name
>>>>>>>>>>>>>>>>>>>>>>>> parameter, and
>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> always materialize the KTable unless its state store name
>>>>>>>>>>>>>>>>>>> is set
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> null;
>>>>>>>>>>>>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be
>>>>>>>>>>>>>>>>>>>>>>>> materialized,
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> hence
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>> also have a state store name; 3) KTable.join
>> requires the
>>>>>>>>>>>>>>>>>>>>>>>> joining
>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> be materialized. And today we do not actually have
>> a
>>>>>>>>>>>>>>>>>>>>>>>>>> mechanism to
>>>>>>>>>>>>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>>>>>>>>>>> that, but will only throw an exception at runtime if
>>>>>>>>>>>>>>>>>>>>>>>> it is not
>>>>>>>>>>>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE
>> will be
>>>>>>>>>>>>>>>>>>>>>>>>>> thrown).
>>>>>>>>>>>>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off the
>>>>>>>>>>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>>>>>>> let's
>>>>>>>>>>>>>>>>>>>>>>>> remove all the state store params in other KTable
>>>>>>>>>>>>>>>>>>>>>>>> functions,
>>>>>>>>>>>>>>>>>>>>>>>> and if
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable
>>>>>>>>>>>>>>>>>>>>>>>>> resulted
>>>>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>> KXX.agg)
>>>>>>>>>>>>>>>>>>>>>>>> and users do not call materialize(), then we treat
>> it
>>>>>>>>>>>>>>>>>>>>>>>> as "users
>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>> interested in querying it at all" and hence use an
>>>>>>>>>>>>>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>>>>>>>>> generated
>>>>>>>>>>>>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is
>>>>>>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>>>>>> store is not exposed to users. And if users call
>>>>>>>>>>>>>>>>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>>>>>>>>>> afterwards
>>>>>>>>>>>>>>>>>>>>>>>> but we have already decided to materialize it, we
>> can
>>>>>>>>>>>>>>>>>>>>>>>> replace
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>>>>>>>>>>>> name with the user's provided names. Then from a
>> user's
>>>>>>>>>>>>>>>>>>>>>>>>> point-view,
>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>> ever want to query a KTable, they have to call
>>>>>>>>>>>>>>>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> given
>>>>>>>>>>>>>>>>>>>>>>>> state store name. This approach has one awkwardness
>>>>>>>>>>>>>>>>>>>>>>>> though,
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> serdes
>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> state store names param are not separated and
>> could be
>>>>>>>>>>>>>>>>>>>>>>>>>> overlapped
>>>>>>>>>>>>>>>>>>>>>>>>>> (see
>>>>>>>>>>>>>>>>>>>>>>>>>> detailed comment #2 below).
>>>>>>>>>>>>>>>>>>>>>>>>>> 2. This step does not need to be included in this
>>>>>>>>>>>>>>>>>>>>>>>>>> KIP, but
>>>>>>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>> reference / future work: as we have discussed before, we
>> may
>>>>>>>>>>>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>>>>>> materialize KTable.join resulted KTables as well in the
>>>>>>>>>>>>>>>>>>>>>>>>>> future. If
>>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>>>>> that, then:
>>>>>>>>>>>>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always
>> materialized;
>>>>>>>>>>>>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to
>>>>>>>>>>>>>>>>>>>>>>>>>> always be
>>>>>>>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>>>>>>> (otherwise we would not know the old value);
>>>>>>>>>>>>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always
>>>>>>>>>>>>>>>>>>>>>>>>>> materialized, and
>>>>>>>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> joining KTables to always be materialized.
>>>>>>>>>>>>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables
>>>>>>>>>>>>>>>>>>>>>>>>>> materialization
>>>>>>>>>>>>>>>>>>>>>>>>>> depend
>>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>>>> parent's materialization;
>>>>>>>>>>>>>>>>>>>>>>>>> By recursive induction all KTables are actually
>> always
>>>>>>>>>>>>>>>>>>>>>>>>> materialized,
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> then the effect of the "materialize()" is just for
>>>>>>>>>>>>>>>>>>>>>>>> specifying
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>>>>>> store names. In this scenario, we do not need to
>> send
>>>>>>>>>>>>>>>>>>>>>>>>>> Change<V> in
>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics within joins any more, but
>> only for
>>>>>>>>>>>>>>>>>>>>>>>>> repartitions
>>>>>>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>>>>>>>>>>> within aggregations. Instead, we can just send a
>>>>>>>>>>>>>>>>>>>>>>>>> "tombstone"
>>>>>>>>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> old value and we do not need to calculate joins
>> twice
>>>>>>>>>>>>>>>>>>>>>>>> (one more
>>>>>>>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>> old value is received).
>>>>>>>>>>>>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a
>>>>>>>>>>>>>>>>>>>>>>>>>> "KStream#toTable()"
>>>>>>>>>>>>>>>>>>>>>>>>>> function
>>>>>>>>>>>>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where
>> the
>>>>>>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of
>> use
>>>>>>>>>>>>>>>>>>>>>>>>>> cases of
>>>>>>>>>>>>>>>>>>>>>>>>>> this,
>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> example, users want to read a changelog topic,
>> apply
>>>>>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>> filters,
>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>>>>> materialize it into a KTable with state stores
>> without
>>>>>>>>>>>>>>>>>>>>>>>>>> creating
>>>>>>>>>>>>>>>>>>>>>>>>>> duplicated
>>>>>>>>>>>>>>>>>>>>>>>>> changelog topics. With materialize() and toTable
>> I'd
>>>>>>>>>>>>>>>>>>>>>>>>> imagine
>>>>>>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>> specify sth. like:
>>>>>>>>>>>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>>>>>>>>>>>> KStream stream =
>> builder.stream("topic1").filter(..);
>>>>>>>>>>>>>>>>>>>>>>>>>> KTable table = stream.toTable(..);
>>>>>>>>>>>>>>>>>>>>>>>>>> table.materialize("state1");
>>>>>>>>>>>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>>>>>>>>>>>> And the library in this case could set store
>>>>>>>>>>>>>>>>>>>>>>>>>> "state1" 's
>>>>>>>>>>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly
>> while
>>>>>>>>>>>>>>>>>>>>>>>>>> (re-)storing
>>>>>>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>>>>>> state by reading from this topic, instead of
>> creating a
>>>>>>>>>>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a
>>>>>>>>>>>>>>>>>>>>>>>> semi-duplicate
>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> "topic1".
>>>>>>>>>>>>>>>>>>>>>>>>> Detailed:
>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream";
>>>>>>>>>>>>>>>>>>>>>>>>>> actually I was

Reply via email to