Thanks everyone! I think it's time to do a V2 on the KIP so I'll do that and we 
can see how it looks and continue the discussion from there. Stay tuned.

Thanks
Eno

> On 30 Jan 2017, at 17:23, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> Hi,
> 
> I think Eno's separation is very clear and helpful. In order to
> streamline this discussion, I would suggest we focus back on point (1)
> only, as this is the original KIP question.
> 
> Even if I started to DSL design discussion somehow, because I thought it
> might be helpful to resolve both in a single shot, I feel that we have
> too many options about DSL design and we should split it up in two
> steps. This will have the disadvantage that we will change the API
> twice, but still, I think it will be a more focused discussion.
> 
> I just had another look at the KIP, an it proposes 3 changes:
> 
> 1. add .materialized() -> IIRC it was suggested to name this
> .materialize() though (can you maybe update the KIP Eno?)
> 2. remove print(), writeAsText(), and foreach()
> 3. rename toStream() to toKStream()
> 
> 
> I completely agree with (2) -- not sure about (3) though because
> KStreamBuilder also hast .stream() and .table() as methods.
> 
> However, we might want to introduce a KStream#toTable() -- this was
> requested multiple times -- might also be part of a different KIP.
> 
> 
> Thus, we end up with (1). I would suggest to do a step backward here and
> instead of a discussion how to express the changes in the DSL (new
> overload, new methods...) we should discuss what the actual change
> should be. Like (1) materialize all KTable all the time (2) all the user
> to force a materialization to enable querying the KTable (3) allow for
> queryable non-materialized KTable.
> 
> On more question is, if we want to allow a user-forced materialization
> only as as local store without changelog, or both (together /
> independently)? We got some request like this already.
> 
> 
> -Matthias
> 
> 
> On 1/30/17 3:50 AM, Jan Filipiak wrote:
>> Hi Eno,
>> 
>> thanks for putting into different points. I want to put a few remarks
>> inline.
>> 
>> Best Jan
>> 
>> On 30.01.2017 12:19, Eno Thereska wrote:
>>> So I think there are several important discussion threads that are
>>> emerging here. Let me try to tease them apart:
>>> 
>>> 1. inconsistency in what is materialized and what is not, what is
>>> queryable and what is not. I think we all agree there is some
>>> inconsistency there and this will be addressed with any of the
>>> proposed approaches. Addressing the inconsistency is the point of the
>>> original KIP.
>>> 
>>> 2. the exact API for materializing a KTable. We can specify 1) a
>>> "store name" (as we do today) or 2) have a ".materialize[d]" call or
>>> 3) get a handle from a KTable ".getQueryHandle" or 4) have a builder
>>> construct. So we have discussed 4 options. It is important to remember
>>> in this discussion that IQ is not designed for just local queries, but
>>> also for distributed queries. In all cases an identifying name/id is
>>> needed for the store that the user is interested in querying. So we
>>> end up with a discussion on who provides the name, the user (as done
>>> today) or if it is generated automatically (as Jan suggests, as I
>>> understand it). If it is generated automatically we need a way to
>>> expose these auto-generated names to the users and link them to the
>>> KTables they care to query.
>> Hi, the last sentence is what I currently arguing against. The user
>> would never see a stringtype indentifier name or anything. All he gets
>> is the queryHandle if he executes a get(K) that will be an interactive
>> query get. with all the finding the right servers that currently have a
>> copy of this underlying store stuff going on. The nice part is that if
>> someone retrieves a queryHandle, you know that you have to materialized
>> (if you are not already) as queries will be coming. Taking away the
>> confusion mentioned in point 1 IMO.
>>> 
>>> 3. The exact boundary between the DSL, that is the processing
>>> language, and the storage/IQ queries, and how we jump from one to the
>>> other. This is mostly for how we get a handle on a store (so it's
>>> related to point 2), rather than for how we query the store. I think
>>> we all agree that we don't want to limit ways one can query a store
>>> (e.g., using gets or range queries etc) and the query APIs are not in
>>> the scope of the DSL.
>> Does the IQ work with range currently? The range would have to be
>> started on all stores and then merged by maybe the client. Range force a
>> flush to RocksDB currently so I am sure you would get a performance hit
>> right there. Time-windows might be okay, but I am not sure if the first
>> version should offer the user range access.
>>> 
>>> 4. The nature of the DSL and whether its declarative enough, or
>>> flexible enough. Damian made the point that he likes the builder
>>> pattern since users can specify, per KTable, things like caching and
>>> logging needs. His observation (as I understand it) is that the
>>> processor API (PAPI) is flexible but doesn't provide any help at all
>>> to users. The current DSL provides declarative abstractions, but it's
>>> not fine-grained enough. This point is much broader than the KIP, but
>>> discussing it in this KIPs context is ok, since we don't want to make
>>> small piecemeal changes and then realise we're not in the spot we want
>>> to be.
>> This is indeed much broader. My guess here is that's why both API's
>> exists and helping the users to switch back and forth might be a thing.
>>> 
>>> Feel free to pitch in if I have misinterpreted something.
>>> 
>>> Thanks
>>> Eno
>>> 
>>> 
>>>> On 30 Jan 2017, at 10:22, Jan Filipiak <jan.filip...@trivago.com> wrote:
>>>> 
>>>> Hi Eno,
>>>> 
>>>> I have a really hard time understanding why we can't. From my point
>>>> of view everything could be super elegant DSL only + public api for
>>>> the PAPI-people as already exist.
>>>> 
>>>> The above aproach implementing a .get(K) on KTable is foolisch in my
>>>> opinion as it would be to late to know that materialisation would be
>>>> required.
>>>> But having an API that allows to indicate I want to query this table
>>>> and then wrapping the say table's processorname can work out really
>>>> really nice. The only obstacle I see is people not willing to spend
>>>> the additional time in implementation and just want a quick shot
>>>> option to make it work.
>>>> 
>>>> For me it would look like this:
>>>> 
>>>> table =  builder.table()
>>>> filteredTable = table.filter()
>>>> rawHandle = table.getQueryHandle() // Does the materialisation,
>>>> really all names possible but id rather hide the implication of it
>>>> materializes
>>>> filteredTableHandle = filteredTable.getQueryHandle() // this would
>>>> _not_ materialize again of course, the source or the aggregator would
>>>> stay the only materialized processors
>>>> streams = new streams(builder)
>>>> 
>>>> This middle part is highly flexible I could imagin to force the user
>>>> todo something like this. This implies to the user that his streams
>>>> need to be running
>>>> instead of propagating the missing initialisation back by exceptions.
>>>> Also if the users is forced to pass the appropriate streams instance
>>>> back can change.
>>>> I think its possible to build multiple streams out of  one topology
>>>> so it would be easiest to implement aswell. This is just what I maybe
>>>> had liked the most
>>>> 
>>>> streams.start();
>>>> rawHandle.prepare(streams)
>>>> filteredHandle.prepare(streams)
>>>> 
>>>> later the users can do
>>>> 
>>>> V value = rawHandle.get(K)
>>>> V value = filteredHandle.get(K)
>>>> 
>>>> This could free DSL users from anything like storenames and how and
>>>> what to materialize. Can someone indicate what the problem would be
>>>> implementing it like this.
>>>> Yes I am aware that the current IQ API will not support querying by
>>>> KTableProcessorName instread of statestoreName. But I think that had
>>>> to change if you want it to be intuitive
>>>> IMO you gotta apply the filter read time
>>>> 
>>>> Looking forward to your opinions
>>>> 
>>>> Best Jan
>>>> 
>>>> 
>>>> #DeathToIQMoreAndBetterConnectors
>>>> 
>>>> 
>>>> 
>>>> On 30.01.2017 10:42, Eno Thereska wrote:
>>>>> Hi there,
>>>>> 
>>>>> The inconsistency will be resolved, whether with materialize or
>>>>> overloaded methods.
>>>>> 
>>>>> With the discussion on the DSL & stores I feel we've gone in a
>>>>> slightly different tangent, which is worth discussing nonetheless.
>>>>> We have entered into an argument around the scope of the DSL. The
>>>>> DSL has been designed primarily for processing. The DSL does not
>>>>> dictate ways to access state stores or what hind of queries to
>>>>> perform on them. Hence, I see the mechanism for accessing storage as
>>>>> decoupled from the DSL.
>>>>> 
>>>>> We could think of ways to get store handles from part of the DSL,
>>>>> like the KTable abstraction. However, subsequent queries will be
>>>>> store-dependent and not rely on the DSL, hence I'm not sure we get
>>>>> any grand-convergence DSL-Store here. So I am arguing that the
>>>>> current way of getting a handle on state stores is fine.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>>> On 30 Jan 2017, at 03:56, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>> 
>>>>>> Thinking loud here about the API options (materialize v.s. overloaded
>>>>>> functions) and its impact on IQ:
>>>>>> 
>>>>>> 1. The first issue of the current DSL is that, there is
>>>>>> inconsistency upon
>>>>>> whether / how KTables should be materialized:
>>>>>> 
>>>>>>    a) in many cases the library HAS TO materialize KTables no
>>>>>> matter what,
>>>>>> e.g. KStream / KTable aggregation resulted KTables, and hence we
>>>>>> enforce
>>>>>> users to provide store names and throw RTE if it is null;
>>>>>>    b) in some other cases, the KTable can be materialized or not; for
>>>>>> example in KStreamBuilder.table(), store names can be nullable and
>>>>>> in which
>>>>>> case the KTable would not be materialized;
>>>>>>    c) in some other cases, the KTable will never be materialized, for
>>>>>> example KTable.filter() resulted KTables, and users have no options to
>>>>>> enforce them to be materialized;
>>>>>>    d) this is related to a), where some KTables are required to be
>>>>>> materialized, but we do not enforce users to provide a state store
>>>>>> name,
>>>>>> e.g. KTables involved in joins; a RTE will be thrown not
>>>>>> immediately but
>>>>>> later in this case.
>>>>>> 
>>>>>> 2. The second issue is related to IQ, where state stores are
>>>>>> accessed by
>>>>>> their state stores; so only those KTable's that have user-specified
>>>>>> state
>>>>>> stores will be queryable. But because of 1) above, many stores may
>>>>>> not be
>>>>>> interested to users for IQ but they still need to provide a
>>>>>> (dummy?) state
>>>>>> store name for them; while on the other hand users cannot query
>>>>>> some state
>>>>>> stores, e.g. the ones generated by KTable.filter() as there is no
>>>>>> APIs for
>>>>>> them to specify a state store name.
>>>>>> 
>>>>>> 3. We are aware from user feedbacks that such backend details would be
>>>>>> better be abstracted away from the DSL layer, where app developers
>>>>>> should
>>>>>> just focus on processing logic, while state stores along with their
>>>>>> changelogs etc would better be in a different mechanism; same
>>>>>> arguments
>>>>>> have been discussed for serdes / windowing triggers as well. For
>>>>>> serdes
>>>>>> specifically, we had a very long discussion about it and concluded
>>>>>> that, at
>>>>>> least in Java7, we cannot completely abstract serde away in the
>>>>>> DSL, so we
>>>>>> choose the other extreme to enforce users to be completely aware of
>>>>>> the
>>>>>> serde requirements when some KTables may need to be materialized vis
>>>>>> overloaded API functions. While for the state store names, I feel
>>>>>> it is a
>>>>>> different argument than serdes (details below).
>>>>>> 
>>>>>> 
>>>>>> So to me, for either materialize() v.s. overloaded functions
>>>>>> directions,
>>>>>> the first thing I'd like to resolve is the inconsistency issue
>>>>>> mentioned
>>>>>> above. So in either case: KTable materialization will not be affect
>>>>>> by user
>>>>>> providing state store name or not, but will only be decided by the
>>>>>> library
>>>>>> when it is necessary. More specifically, only join operator and
>>>>>> builder.table() resulted KTables are not always materialized, but
>>>>>> are still
>>>>>> likely to be materialized lazily (e.g. when participated in a join
>>>>>> operator).
>>>>>> 
>>>>>> 
>>>>>> For overloaded functions that would mean:
>>>>>> 
>>>>>>    a) we have an overloaded function for ALL operators that could
>>>>>> result
>>>>>> in a KTable, and allow it to be null (i.e. for the function without
>>>>>> this
>>>>>> param it is null by default);
>>>>>>    b) null-state-store-name do not indicate that a KTable would
>>>>>> not be
>>>>>> materialized, but that it will not be used for IQ at all (internal
>>>>>> state
>>>>>> store names will be generated when necessary).
>>>>>> 
>>>>>> 
>>>>>> For materialize() that would mean:
>>>>>> 
>>>>>>    a) we will remove state store names from ALL operators that could
>>>>>> result in a KTable.
>>>>>>    b) KTables that not calling materialized do not indicate that a
>>>>>> KTable
>>>>>> would not be materialized, but that it will not be used for IQ at all
>>>>>> (internal state store names will be generated when necessary).
>>>>>> 
>>>>>> 
>>>>>> Again, in either ways the API itself does not "hint" about anything
>>>>>> for
>>>>>> materializing a KTable or not at all; it is still purely determined
>>>>>> by the
>>>>>> library when parsing the DSL for now.
>>>>>> 
>>>>>> Following these thoughts, I feel that 1) we should probably change
>>>>>> the name
>>>>>> "materialize" since it may be misleading to users as what actually
>>>>>> happened
>>>>>> behind the scene, to e.g. Damian suggested "queryableStore(String
>>>>>> storeName)",
>>>>>> which returns a QueryableStateStore, and can replace the
>>>>>> `KafkaStreams.store` function; 2) comparing those two options
>>>>>> assuming we
>>>>>> get rid of the misleading function name, I personally favor not
>>>>>> adding more
>>>>>> overloading functions as it keeps the API simpler.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak
>>>>>> <jan.filip...@trivago.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> thanks for your mail, felt like this can clarify some things! The
>>>>>>> thread
>>>>>>> unfortunately split but as all branches close in on what my
>>>>>>> suggestion was
>>>>>>> about Ill pick this to continue
>>>>>>> 
>>>>>>> Of course only the table the user wants to query would be
>>>>>>> materialized.
>>>>>>> (retrieving the queryhandle implies materialisation). So In the
>>>>>>> example of
>>>>>>> KTable::filter if you call
>>>>>>> getIQHandle on both tables only the one source that is there would
>>>>>>> materialize and the QueryHandleabstraction would make sure it gets
>>>>>>> mapped
>>>>>>> and filtered and what not uppon read as usual.
>>>>>>> 
>>>>>>> Of Course the Object you would retrieve would maybe only wrap the
>>>>>>> storeName / table unique identifier and a way to access the streams
>>>>>>> instance and then basically uses the same mechanism that is
>>>>>>> currently used.
>>>>>>> From my point of view this is the least confusing way for DSL
>>>>>>> users. If
>>>>>>> its to tricky to get a hand on the streams instance one could ask
>>>>>>> the user
>>>>>>> to pass it in before executing queries, therefore making sure the
>>>>>>> streams
>>>>>>> instance has been build.
>>>>>>> 
>>>>>>> The effort to implement this is indeed some orders of magnitude
>>>>>>> higher
>>>>>>> than the overloaded materialized call. As long as I could help
>>>>>>> getting a
>>>>>>> different view I am happy.
>>>>>>> 
>>>>>>> Best Jan
>>>>>>> 
>>>>>>> 
>>>>>>> On 28.01.2017 09:36, Eno Thereska wrote:
>>>>>>> 
>>>>>>>> Hi Jan,
>>>>>>>> 
>>>>>>>> I understand your concern. One implication of not passing any
>>>>>>>> store name
>>>>>>>> and just getting an IQ handle is that all KTables would need to be
>>>>>>>> materialised. Currently the store name (or proposed
>>>>>>>> .materialize() call)
>>>>>>>> act as hints on whether to materialise the KTable or not.
>>>>>>>> Materialising
>>>>>>>> every KTable can be expensive, although there are some tricks one
>>>>>>>> can play,
>>>>>>>> e.g., have a virtual store rather than one backed by a Kafka topic.
>>>>>>>> 
>>>>>>>> However, even with the above, after getting an IQ handle, the
>>>>>>>> user would
>>>>>>>> still need to use IQ APIs to query the state. As such, we would
>>>>>>>> still
>>>>>>>> continue to be outside the original DSL so this wouldn't address
>>>>>>>> your
>>>>>>>> original concern.
>>>>>>>> 
>>>>>>>> So I read this suggestion as simplifying the APIs by removing the
>>>>>>>> store
>>>>>>>> name, at the cost of having to materialise every KTable. It's
>>>>>>>> definitely an
>>>>>>>> option we'll consider as part of this KIP.
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Eno
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 28 Jan 2017, at 06:49, Jan Filipiak <jan.filip...@trivago.com>
>>>>>>>> wrote:
>>>>>>>>> Hi Exactly
>>>>>>>>> 
>>>>>>>>> I know it works from the Processor API, but my suggestion would
>>>>>>>>> prevent
>>>>>>>>> DSL users dealing with storenames what so ever.
>>>>>>>>> 
>>>>>>>>> In general I am pro switching between DSL and Processor API
>>>>>>>>> easily. (In
>>>>>>>>> my Stream applications I do this a lot with reflection and
>>>>>>>>> instanciating
>>>>>>>>> KTableImpl) Concerning this KIP all I say is that there should
>>>>>>>>> be a DSL
>>>>>>>>> concept of "I want to expose this __KTable__. This can be a
>>>>>>>>> Method like
>>>>>>>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the table
>>>>>>>>> would know
>>>>>>>>> to materialize, and the user had a reference to the "store and the
>>>>>>>>> distributed query mechanism by the Interactive Query Handle"
>>>>>>>>> under the hood
>>>>>>>>> it can use the same mechanism as the PIP people again.
>>>>>>>>> 
>>>>>>>>> I hope you see my point J
>>>>>>>>> 
>>>>>>>>> Best Jan
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> #DeathToIQMoreAndBetterConnectors :)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 27.01.2017 21:59, Matthias J. Sax wrote:
>>>>>>>>> 
>>>>>>>>>> Jan,
>>>>>>>>>> 
>>>>>>>>>> the IQ feature is not limited to Streams DSL but can also be
>>>>>>>>>> used for
>>>>>>>>>> Stores used in PAPI. Thus, we need a mechanism that does work
>>>>>>>>>> for PAPI
>>>>>>>>>> and DSL.
>>>>>>>>>> 
>>>>>>>>>> Nevertheless I see your point and I think we could provide a
>>>>>>>>>> better API
>>>>>>>>>> for KTable stores including the discovery of remote shards of
>>>>>>>>>> the same
>>>>>>>>>> KTable.
>>>>>>>>>> 
>>>>>>>>>> @Michael: Yes, right now we do have a lot of overloads and I am
>>>>>>>>>> not a
>>>>>>>>>> big fan of those -- I would rather prefer a builder pattern.
>>>>>>>>>> But that
>>>>>>>>>> might be a different discussion (nevertheless, if we would aim
>>>>>>>>>> for a API
>>>>>>>>>> rework, we should get the changes with regard to stores right
>>>>>>>>>> from the
>>>>>>>>>> beginning on, in order to avoid a redesign later on.)
>>>>>>>>>> 
>>>>>>>>>> something like:
>>>>>>>>>> 
>>>>>>>>>> stream.groupyByKey()
>>>>>>>>>>       .window(TimeWindow.of(5000))
>>>>>>>>>>       .aggregate(...)
>>>>>>>>>>       .withAggValueSerde(new CustomTypeSerde())
>>>>>>>>>>       .withStoreName("storeName);
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> (This would also reduce JavaDoc redundancy -- maybe a personal
>>>>>>>>>> pain
>>>>>>>>>> point right now :))
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote:
>>>>>>>>>> 
>>>>>>>>>>> Yeah,
>>>>>>>>>>> 
>>>>>>>>>>> Maybe my bad that I refuse to look into IQ as i don't find them
>>>>>>>>>>> anywhere
>>>>>>>>>>> close to being interesting. The Problem IMO is that people
>>>>>>>>>>> need to know
>>>>>>>>>>> the Store name), so we are working on different levels to
>>>>>>>>>>> achieve a
>>>>>>>>>>> single goal.
>>>>>>>>>>> 
>>>>>>>>>>> What is your peoples opinion on having a method on KTABLE that
>>>>>>>>>>> returns
>>>>>>>>>>> them something like a Keyvalue store. There is of course
>>>>>>>>>>> problems like
>>>>>>>>>>> "it cant be used before the streamthreads are going and
>>>>>>>>>>> groupmembership
>>>>>>>>>>> is established..." but the benefit would be that for the user
>>>>>>>>>>> there is
>>>>>>>>>>> a
>>>>>>>>>>> consistent way of saying "Hey I need it materialized as
>>>>>>>>>>> querries gonna
>>>>>>>>>>> be comming" + already get a Thing that he can execute the
>>>>>>>>>>> querries on
>>>>>>>>>>> in
>>>>>>>>>>> 1 step.
>>>>>>>>>>> What I think is unintuitive here is you need to say
>>>>>>>>>>> materialize on this
>>>>>>>>>>> Ktable and then you go somewhere else and find its store name
>>>>>>>>>>> and then
>>>>>>>>>>> you go to the kafkastreams instance and ask for the store with
>>>>>>>>>>> this
>>>>>>>>>>> name.
>>>>>>>>>>> 
>>>>>>>>>>> So one could the user help to stay in DSL land and therefore
>>>>>>>>>>> maybe
>>>>>>>>>>> confuse him less.
>>>>>>>>>>> 
>>>>>>>>>>> Best Jan
>>>>>>>>>>> 
>>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :)
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On 27.01.2017 16:51, Damian Guy wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I think Jan is saying that they don't always need to be
>>>>>>>>>>>> materialized,
>>>>>>>>>>>> i.e.,
>>>>>>>>>>>> filter just needs to apply the ValueGetter, it doesn't need yet
>>>>>>>>>>>> another
>>>>>>>>>>>> physical state store.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll <mich...@confluent.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Like Damian, and for the same reasons, I am more in favor of
>>>>>>>>>>>>> overloading
>>>>>>>>>>>>> methods rather than introducing `materialize()`.
>>>>>>>>>>>>> FWIW, we already have a similar API setup for e.g.
>>>>>>>>>>>>> `KTable#through(topicName, stateStoreName)`.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> A related but slightly different question is what e.g. Jan
>>>>>>>>>>>>> Filipiak
>>>>>>>>>>>>> mentioned earlier in this thread:
>>>>>>>>>>>>> I think we need to explain more clearly why KIP-114 doesn't
>>>>>>>>>>>>> propose
>>>>>>>>>>>>> the
>>>>>>>>>>>>> seemingly simpler solution of always materializing tables/state
>>>>>>>>>>>>> stores.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <
>>>>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? If
>>>>>>>>>>>>>> you uses
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> ValueGetter of Filter it will apply the filter and should be
>>>>>>>>>>>>>> completely
>>>>>>>>>>>>>> transparent as to if another processor or IQ is accessing
>>>>>>>>>>>>>> it? How
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> this
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> new method help?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I cannot see the reason for the additional materialize
>>>>>>>>>>>>>> method being
>>>>>>>>>>>>>> required! Hence I suggest leave it alone.
>>>>>>>>>>>>>> regarding removing the others I dont have strong opinions
>>>>>>>>>>>>>> and it
>>>>>>>>>>>>>> seems to
>>>>>>>>>>>>>> be unrelated.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Forwarding this thread to the users list too in case people
>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> comment. It is also on the dev list.
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Begin forwarded message:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io>
>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and
>>>>>>>>>>>>>>>> improved
>>>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT
>>>>>>>>>>>>>>>> To: d...@kafka.apache.org
>>>>>>>>>>>>>>>> Reply-To: d...@kafka.apache.org
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> That not what I meant by "huge impact".
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I refer to the actions related to materialize a KTable:
>>>>>>>>>>>>>>>> creating a
>>>>>>>>>>>>>>>> RocksDB store and a changelog topic -- users should be
>>>>>>>>>>>>>>>> aware about
>>>>>>>>>>>>>>>> runtime implication and this is better expressed by an
>>>>>>>>>>>>>>>> explicit
>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>> call, rather than implicitly triggered by using a different
>>>>>>>>>>>>>>>> overload of
>>>>>>>>>>>>>>>> a method.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I think your definition of a huge impact and mine are rather
>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>> ;-P
>>>>>>>>>>>>>>>>> Overloading a few methods  is not really a huge impact
>>>>>>>>>>>>>>>>> IMO. It is
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> also a
>>>>>>>>>>>>>> sacrifice worth making for readability, usability of the API.
>>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <
>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I understand your argument, but do not agree with it.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Your first version (even if the "flow" is not as nice)
>>>>>>>>>>>>>>>>>> is more
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> explicit
>>>>>>>>>>>>>> than the second version. Adding a stateStoreName parameter
>>>>>>>>>>>>>> is quite
>>>>>>>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer the
>>>>>>>>>>>>>>>>>> rather more
>>>>>>>>>>>>>>>>>> verbose
>>>>>>>>>>>>>>>>>> but explicit version.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts the
>>>>>>>>>>>>>>>>>> flow,
>>>>>>>>>>>>>>>>>>> i.e,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> table.mapValue(..).materialize().join(..).materialize()
>>>>>>>>>>>>>>>>>>> compared to:
>>>>>>>>>>>>>>>>>>> table.mapValues(..).join(..)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I know which one i prefer.
>>>>>>>>>>>>>>>>>>> My preference is stil to provide overloaded methods where
>>>>>>>>>>>>>>>>>>> people can
>>>>>>>>>>>>>>>>>>> specify the store names if they want, otherwise we just
>>>>>>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax
>>>>>>>>>>>>>>>>>>> <matth...@confluent.io
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing store
>>>>>>>>>>>>>>>>>>>> name from
>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>>>>>> methods and generate internal names (however, I would
>>>>>>>>>>>>>>>>>>>> do this
>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> overloads). Furthermore, I would not force users to call
>>>>>>>>>>>>>>>>>>>> .materialize()
>>>>>>>>>>>>>>>>>>>> if they want to query a store, but add one more method
>>>>>>>>>>>>>>>>>>>> .stateStoreName()
>>>>>>>>>>>>>>>>>>>> that returns the store name if the KTable is
>>>>>>>>>>>>>>>>>>>> materialized.
>>>>>>>>>>>>>>>>>>>> Thus,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>> .materialize() must not necessarily have a parameter storeName
>>>>>>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> should have some overloads here).
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I would also not allow to provide a null store name (to
>>>>>>>>>>>>>>>>>>>> indicate no
>>>>>>>>>>>>>>>>>>>> materialization if not necessary) but throw an
>>>>>>>>>>>>>>>>>>>> exception.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> This yields some simplification (see below).
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about
>>>>>>>>>>>>>>>>>>>> KStream#toTable()
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 3)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>    3. What will happen when you call materialize on
>>>>>>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>>>>>>    materialized? Will it create another StateStore
>>>>>>>>>>>>>>>>>>>>> (providing
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>    different), throw an Exception?
>>>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there is no
>>>>>>>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>> worry
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>> a second materialization and also no exception must be
>>>>>>>>>>>>>>>>>>>> throws. A
>>>>>>>>>>>>>>>>>>>> call to
>>>>>>>>>>>>>>>>>>>> .materialize() basically sets a "materialized flag" (ie,
>>>>>>>>>>>>>>>>>>>> idempotent
>>>>>>>>>>>>>>>>>>>> operation) and sets a new name.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 4)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use
>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and
>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#table()`, for
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> don't care about the "K" prefix.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Eno's reply:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it
>>>>>>>>>>>>>>>>>>>> absolutely
>>>>>>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> we are converting it to.
>>>>>>>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder
>>>>>>>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>>>>>>> (but
>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> this KIP).
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I would keep #toStream(). (see below)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 5) We should not remove any methods but only
>>>>>>>>>>>>>>>>>>>> deprecate them.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> A general note:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I do not understand your comments "Rejected
>>>>>>>>>>>>>>>>>>>> Alternatives". You
>>>>>>>>>>>>>>>>>>>> say
>>>>>>>>>>>>>>>>>>>> "Have
>>>>>>>>>>>>>>>>>>>> the KTable be the materialized view" was rejected.
>>>>>>>>>>>>>>>>>>>> But your
>>>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of
>>>>>>>>>>>>>>>>>>>> KTable is
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> secondary
>>>>>>>>>>>>>> after those changes and the "view" abstraction is what a
>>>>>>>>>>>>>>>>>>>> KTable is.
>>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>> just to be clear, I like this a lot:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> - it aligns with the name KTable
>>>>>>>>>>>>>>>>>>>> - is aligns with stream-table-duality
>>>>>>>>>>>>>>>>>>>> - it aligns with IQ
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction" (as
>>>>>>>>>>>>>>>>>>>> materialization is
>>>>>>>>>>>>>>>>>>>> optional).
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta comments
>>>>>>>>>>>>>>>>>>>> and a few
>>>>>>>>>>>>>>>>>>>>> detailed
>>>>>>>>>>>>>>>>>>>>> comments:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 1. I like the materialize() function in general, but
>>>>>>>>>>>>>>>>>>>>> I would
>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>>>> how other KTable functions should be updated
>>>>>>>>>>>>>>>>>>> accordingly. For
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>> 1)
>>>>>>>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name
>>>>>>>>>>>>>>>>>>> parameter, and
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>> always materialize the KTable unless its state store name
>>>>>>>>>>>>>> is set
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> null;
>>>>>>>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be
>>>>>>>>>>>>>>>>>>> materialized,
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> hence
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> also have a state store name; 3) KTable.join requires the
>>>>>>>>>>>>>>>>>>> joining
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> be materialized. And today we do not actually have a
>>>>>>>>>>>>>>>>>>>>> mechanism to
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>>>>>> that, but will only throw an exception at runtime if
>>>>>>>>>>>>>>>>>>> it is not
>>>>>>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE will be
>>>>>>>>>>>>>>>>>>>>> thrown).
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off the
>>>>>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>> let's
>>>>>>>>>>>>>>>>>>> remove all the state store params in other KTable
>>>>>>>>>>>>>>>>>>> functions,
>>>>>>>>>>>>>>>>>>> and if
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable
>>>>>>>>>>>>>>>>>>>> resulted
>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> KXX.agg)
>>>>>>>>>>>>>>>>>>> and users do not call materialize(), then we treat it
>>>>>>>>>>>>>>>>>>> as "users
>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>> interested in querying it at all" and hence use an
>>>>>>>>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> generated
>>>>>>>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is
>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>> store is not exposed to users. And if users call
>>>>>>>>>>>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> afterwards
>>>>>>>>>>>>>>>>>>> but we have already decided to materialize it, we can
>>>>>>>>>>>>>>>>>>> replace
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> name with the user's provided names. Then from a user's
>>>>>>>>>>>>>>>>>>>> point-view,
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>> ever want to query a KTable, they have to call
>>>>>>>>>>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>> given
>>>>>>>>>>>>>>>>>>> state store name. This approach has one awkwardness
>>>>>>>>>>>>>>>>>>> though,
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> serdes
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> state store names param are not separated and could be
>>>>>>>>>>>>>>>>>>>>> overlapped
>>>>>>>>>>>>>>>>>>>>> (see
>>>>>>>>>>>>>>>>>>>>> detailed comment #2 below).
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 2. This step does not need to be included in this
>>>>>>>>>>>>>>>>>>>>> KIP, but
>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>> reference / future work: as we have discussed before, we may
>>>>>>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>> materialize KTable.join resulted KTables as well in the
>>>>>>>>>>>>>>>>>>>>> future. If
>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>> that, then:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always materialized;
>>>>>>>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to
>>>>>>>>>>>>>>>>>>>>> always be
>>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>> (otherwise we would not know the old value);
>>>>>>>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always
>>>>>>>>>>>>>>>>>>>>> materialized, and
>>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> joining KTables to always be materialized.
>>>>>>>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables
>>>>>>>>>>>>>>>>>>>>> materialization
>>>>>>>>>>>>>>>>>>>>> depend
>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>> parent's materialization;
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> By recursive induction all KTables are actually always
>>>>>>>>>>>>>>>>>>>> materialized,
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> then the effect of the "materialize()" is just for
>>>>>>>>>>>>>>>>>>> specifying
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>> store names. In this scenario, we do not need to send
>>>>>>>>>>>>>>>>>>>>> Change<V> in
>>>>>>>>>>>>>>>>>>>>> repartition topics within joins any more, but only for
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> repartitions
>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>>>>>> within aggregations. Instead, we can just send a
>>>>>>>>>>>>>>>>>>>> "tombstone"
>>>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> old value and we do not need to calculate joins twice
>>>>>>>>>>>>>>>>>>> (one more
>>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> old value is received).
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a
>>>>>>>>>>>>>>>>>>>>> "KStream#toTable()"
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> function
>>>>>>>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where the
>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of use
>>>>>>>>>>>>>>>>>>>>> cases of
>>>>>>>>>>>>>>>>>>>>> this,
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> example, users want to read a changelog topic, apply
>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>> filters,
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>> materialize it into a KTable with state stores without
>>>>>>>>>>>>>>>>>>>>> creating
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> duplicated
>>>>>>>>>>>>>>>>>>>> changelog topics. With materialize() and toTable I'd
>>>>>>>>>>>>>>>>>>>> imagine
>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>> specify sth. like:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>>>>>>> KStream stream = builder.stream("topic1").filter(..);
>>>>>>>>>>>>>>>>>>>>> KTable table = stream.toTable(..);
>>>>>>>>>>>>>>>>>>>>> table.materialize("state1");
>>>>>>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> And the library in this case could set store
>>>>>>>>>>>>>>>>>>>>> "state1" 's
>>>>>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly while
>>>>>>>>>>>>>>>>>>>>> (re-)storing
>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>> state by reading from this topic, instead of creating a
>>>>>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a
>>>>>>>>>>>>>>>>>>> semi-duplicate
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> "topic1".
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Detailed:
>>>>>>>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream";
>>>>>>>>>>>>>>>>>>>>> actually I was
>>>>>>>>>>>>>>>>>>>>> thinking
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>> renaming to "#toChangeLog" but after thinking a bit
>>>>>>>>>>>>>>>>>>>> more I
>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> #toStream
>>>>>>>>>>>>>>>>>>>> is still better, and we can just mention in the
>>>>>>>>>>>>>>>>>>>> javaDoc that
>>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>> transforming its underlying changelog stream to a
>>>>>>>>>>>>>>>>>>>>> normal
>>>>>>>>>>>>>>>>>>>>> stream.
>>>>>>>>>>>>>>>>>>>>> 2. As Damian mentioned, there are a few scenarios
>>>>>>>>>>>>>>>>>>>>> where the
>>>>>>>>>>>>>>>>>>>>> serdes
>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>> already specified in a previous operation whereas it
>>>>>>>>>>>>>>>>>>>>> is not
>>>>>>>>>>>>>>>>>>>>> known
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>> calling materialize, for example:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s.
>>>>>>>>>>>>>>>>>>>>> table.mapValues(/*no
>>>>>>>>>>>>>>>>>>> serde specified*/).materialize(serde). We need to specify
>>>>>>>>>>>>>>>>>>> what are
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> handling logic here.
>>>>>>>>>>>>>>>>>>>>> 3. We can remove "KTable#to" call as well, and
>>>>>>>>>>>>>>>>>>>>> enforce users
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> call "
>>>>>>>>>>>>>>>>>>>>> KTable.toStream.to" to be more clear.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <
>>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it
>>>>>>>>>>>>>>>>>>>>> absolutely
>>>>>>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> are converting it to.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I'd say we should probably change the
>>>>>>>>>>>>>>>>>>>>>> KStreamBuilder methods
>>>>>>>>>>>>>>>>>>>>>> (but
>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> this KIP).
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <
>>>>>>>>>>>>>>>>>>>>>> mich...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use
>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and
>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#table()`, for
>>>>>>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> don't care about the "K" prefix.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <
>>>>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> Thanks Damian, answers inline:
>>>>>>>>>>>>>>>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <
>>>>>>>>>>>>>>>>>>>>>>>> damian....@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Eno,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. Some comments:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 1. I'd probably rename materialized to materialize.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Ok.
>>>>>>>>>>>>>>>>>>>>>>>> 2. I don't think the addition of the new Log
>>>>>>>>>>>>>>>>>>>>>>>> compaction
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> mechanism
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> necessary for this KIP, i.e, the KIP is useful
>>>>>>>>>>>>>>>>>>>>>>> without it.
>>>>>>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> should be a different KIP?
>>>>>>>>>>>>>>>>>>>>>> Agreed, already removed. Will do a separate KIP for
>>>>>>>>>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on
>>>>>>>>>>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore
>>>>>>>>>>>>>>>>>>>>>>> (providing
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> different), throw an Exception?
>>>>>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>>>>>>>>>>>>>>>>> 4. Have you considered overloading the existing
>>>>>>>>>>>>>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> operations
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>> a state store name? So if a state store name is
>>>>>>>>>>>>>>>>>>>> provided,
>>>>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> materialize
>>>>>>>>>>>>>>>>>>>>>>>> a state store? This would be my preferred
>>>>>>>>>>>>>>>>>>>>>>>> approach as i
>>>>>>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>> materialize is always a valid operation.
>>>>>>>>>>>>>>>>>>>> Ok I can see your point. This will increase the KIP size
>>>>>>>>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>>>>>>> I'll
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>> to enumerate all overloaded methods, but it's not a
>>>>>>>>>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>>>>>>>>> 5. The materialize method will need ta value Serde
>>>>>>>>>>>>>>>>>>>>>> as some
>>>>>>>>>>>>>>>>>>>>>>>> operations,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> i.e., mapValues, join etc can change the value types
>>>>>>>>>>>>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609
>>>>>>>>>>>>>>>>>>>>>> - might
>>>>>>>>>>>>>>>>>>>>>>>>> mean
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> always need to materialize the StateStore for
>>>>>>>>>>>>>>>>>>>>>> KTable-KTable
>>>>>>>>>>>>>>>>>>>>>>>>> joins.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> is the case, then the KTable Join operators will
>>>>>>>>>>>>>>>>>>>>>> also need
>>>>>>>>>>>>>>>>>>>>>>>> Serde
>>>>>>>>>>>>>> information.
>>>>>>>>>>>>>>>>>>>>>>>>> I'll update the KIP with the serdes.
>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <
>>>>>>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>> We created "KIP-114: KTable materialization and
>>>>>>>>>>>>>>>>>>>>>>>>>> improved
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> semantics"
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> solidify the KTable semantics in Kafka Streams:
>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics
>>>>>>>>>>>>>>>>>>>>>>>>> Your feedback is appreciated.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> -- 
>>>>>> -- Guozhang
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to