About GlobalKTables, I suppose there is no reason why they cannot also use this KIP for consistency, e.g., today you have:
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) For consistency with the KIP you could also have an overload without the store name, for people who want to construct a global ktable, but don't care about querying it directly: public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic) Damian, what do you think? I'm thinking of adding this to KIP. Thanks to Michael for bringing it up. Eno > On 11 Apr 2017, at 06:13, Eno Thereska <eno.there...@gmail.com> wrote: > > Hi Michael, comments inline: > >> On 11 Apr 2017, at 03:25, Michael Noll <mich...@confluent.io> wrote: >> >> Thanks for the updates, Eno! >> >> In addition to what has already been said: We should also explicitly >> mention that this KIP is not touching GlobalKTable. I'm sure that some >> users will throw KTable and GlobalKTable into one conceptual "it's all >> tables!" bucket and then wonder how the KIP might affect global tables. > > Good point, I'll add. > > >> >> Damian wrote: >>> I think if no store name is provided users would still be able to query >> the >>> store, just the store name would be some internally generated name. They >>> would be able to discover those names via the IQ API. >> >> I, too, think that users should be able to query a store even if its name >> was internally generated. After all, the data is already there / >> materialized. > > Yes, there is nothing that will prevent users from querying internally > generated stores, but they cannot > assume a store will necessarily be queryable. So if it's there, they can > query it. If it's not there, and they didn't > provide a queryable name, they cannot complain and say "hey, where is my > store". If they must absolutely be certain that > a store is queryable, then they must provide a queryable name. > > >> >> >> Damian wrote: >>> I think for some stores it will make sense to not create a physical >> store, i.e., >>> for thinks like `filter`, as this will save the rocksdb overhead. But i >> guess that >>> is more of an implementation detail. >> >> I think it would help if the KIP would clarify what we'd do in such a >> case. For example, if the user did not specify a store name for >> `KTable#filter` -- would it be queryable? If so, would this imply we'd >> always materialize the state store, or...? > > I'll clarify in the KIP with some more examples. Materialization will be an > internal concept. A store can be queryable whether it's materialized or not > (e.g., through advanced implementations that compute the value of a filter on > a fly, rather than materialize the answer). > > Thanks, > Eno > > >> >> -Michael >> >> >> >> >> On Tue, Apr 11, 2017 at 9:14 AM, Damian Guy <damian....@gmail.com> wrote: >> >>> Hi Eno, >>> >>> Thanks for the update. I agree with what Matthias said. I wonder if the KIP >>> should talk less about materialization and more about querying? After all, >>> that is what is being provided from an end-users perspective. >>> >>> I think if no store name is provided users would still be able to query the >>> store, just the store name would be some internally generated name. They >>> would be able to discover those names via the IQ API >>> >>> I think for some stores it will make sense to not create a physical store, >>> i.e., for thinks like `filter`, as this will save the rocksdb overhead. But >>> i guess that is more of an implementation detail. >>> >>> Cheers, >>> Damian >>> >>> On Tue, 11 Apr 2017 at 00:36 Eno Thereska <eno.there...@gmail.com> wrote: >>> >>>> Hi Matthias, >>>> >>>>> However, this still forces users, to provide a name for store that we >>>>> must materialize, even if users are not interested in querying the >>>>> stores. Thus, I would like to have overloads for all currently existing >>>>> methods having mandatory storeName paremeter, with overloads, that do >>>>> not require the storeName parameter. >>>> >>>> >>>> Oh yeah, absolutely, this is part of the KIP. I guess I didn't make it >>>> clear, I'll clarify. >>>> >>>> Thanks >>>> Eno >>>> >>>> >>>>> On 10 Apr 2017, at 16:00, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>>> >>>>> Thanks for pushing this KIP Eno. >>>>> >>>>> The update give a very clear description about the scope, that is super >>>>> helpful for the discussion! >>>>> >>>>> - To put it into my own words, the KIP focus is on enable to query all >>>>> KTables. >>>>> ** The ability to query a store is determined by providing a name for >>>>> the store. >>>>> ** At the same time, providing a name -- and thus making a store >>>>> queryable -- does not say anything about an actual materialization (ie, >>>>> being queryable and being materialized are orthogonal). >>>>> >>>>> >>>>> I like this overall a lot. However, I would go one step further. Right >>>>> now, you suggest to add new overload methods that allow users to >>> specify >>>>> a storeName -- if `null` is provided and the store is not materialized, >>>>> we ignore it completely -- if `null` is provided but the store must be >>>>> materialized we generate a internal name. So far so good. >>>>> >>>>> However, this still forces users, to provide a name for store that we >>>>> must materialize, even if users are not interested in querying the >>>>> stores. Thus, I would like to have overloads for all currently existing >>>>> methods having mandatory storeName paremeter, with overloads, that do >>>>> not require the storeName parameter. >>>>> >>>>> Otherwise, we would still have some methods which optional storeName >>>>> parameter and other method with mandatory storeName parameter -- thus, >>>>> still some inconsistency. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 4/9/17 8:35 AM, Eno Thereska wrote: >>>>>> Hi there, >>>>>> >>>>>> I've now done a V2 of the KIP, that hopefully addresses the feedback >>> in >>>> this discussion thread: >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 114%3A+KTable+materialization+and+improved+semantics >>>> < >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 114:+KTable+materialization+and+improved+semantics>. >>>> Notable changes: >>>>>> >>>>>> - clearly outline what is in the scope of the KIP and what is not. We >>>> ran into the issue where lots of useful, but somewhat tangential >>>> discussions came up on interactive queries, declarative DSL etc. The >>> exact >>>> scope of this KIP is spelled out. >>>>>> - decided to go with overloaded methods, not .materialize(), to stay >>>> within the spirit of the current declarative DSL. >>>>>> - clarified the depreciation plan >>>>>> - listed part of the discussion we had under rejected alternatives >>>>>> >>>>>> If you have any further feedback on this, let's continue on this >>> thread. >>>>>> >>>>>> Thank you >>>>>> Eno >>>>>> >>>>>> >>>>>>> On 1 Feb 2017, at 09:04, Eno Thereska <eno.there...@gmail.com> >>> wrote: >>>>>>> >>>>>>> Thanks everyone! I think it's time to do a V2 on the KIP so I'll do >>>> that and we can see how it looks and continue the discussion from there. >>>> Stay tuned. >>>>>>> >>>>>>> Thanks >>>>>>> Eno >>>>>>> >>>>>>>> On 30 Jan 2017, at 17:23, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I think Eno's separation is very clear and helpful. In order to >>>>>>>> streamline this discussion, I would suggest we focus back on point >>> (1) >>>>>>>> only, as this is the original KIP question. >>>>>>>> >>>>>>>> Even if I started to DSL design discussion somehow, because I >>> thought >>>> it >>>>>>>> might be helpful to resolve both in a single shot, I feel that we >>> have >>>>>>>> too many options about DSL design and we should split it up in two >>>>>>>> steps. This will have the disadvantage that we will change the API >>>>>>>> twice, but still, I think it will be a more focused discussion. >>>>>>>> >>>>>>>> I just had another look at the KIP, an it proposes 3 changes: >>>>>>>> >>>>>>>> 1. add .materialized() -> IIRC it was suggested to name this >>>>>>>> .materialize() though (can you maybe update the KIP Eno?) >>>>>>>> 2. remove print(), writeAsText(), and foreach() >>>>>>>> 3. rename toStream() to toKStream() >>>>>>>> >>>>>>>> >>>>>>>> I completely agree with (2) -- not sure about (3) though because >>>>>>>> KStreamBuilder also hast .stream() and .table() as methods. >>>>>>>> >>>>>>>> However, we might want to introduce a KStream#toTable() -- this was >>>>>>>> requested multiple times -- might also be part of a different KIP. >>>>>>>> >>>>>>>> >>>>>>>> Thus, we end up with (1). I would suggest to do a step backward here >>>> and >>>>>>>> instead of a discussion how to express the changes in the DSL (new >>>>>>>> overload, new methods...) we should discuss what the actual change >>>>>>>> should be. Like (1) materialize all KTable all the time (2) all the >>>> user >>>>>>>> to force a materialization to enable querying the KTable (3) allow >>> for >>>>>>>> queryable non-materialized KTable. >>>>>>>> >>>>>>>> On more question is, if we want to allow a user-forced >>> materialization >>>>>>>> only as as local store without changelog, or both (together / >>>>>>>> independently)? We got some request like this already. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 1/30/17 3:50 AM, Jan Filipiak wrote: >>>>>>>>> Hi Eno, >>>>>>>>> >>>>>>>>> thanks for putting into different points. I want to put a few >>> remarks >>>>>>>>> inline. >>>>>>>>> >>>>>>>>> Best Jan >>>>>>>>> >>>>>>>>> On 30.01.2017 12:19, Eno Thereska wrote: >>>>>>>>>> So I think there are several important discussion threads that are >>>>>>>>>> emerging here. Let me try to tease them apart: >>>>>>>>>> >>>>>>>>>> 1. inconsistency in what is materialized and what is not, what is >>>>>>>>>> queryable and what is not. I think we all agree there is some >>>>>>>>>> inconsistency there and this will be addressed with any of the >>>>>>>>>> proposed approaches. Addressing the inconsistency is the point of >>>> the >>>>>>>>>> original KIP. >>>>>>>>>> >>>>>>>>>> 2. the exact API for materializing a KTable. We can specify 1) a >>>>>>>>>> "store name" (as we do today) or 2) have a ".materialize[d]" call >>> or >>>>>>>>>> 3) get a handle from a KTable ".getQueryHandle" or 4) have a >>> builder >>>>>>>>>> construct. So we have discussed 4 options. It is important to >>>> remember >>>>>>>>>> in this discussion that IQ is not designed for just local queries, >>>> but >>>>>>>>>> also for distributed queries. In all cases an identifying name/id >>> is >>>>>>>>>> needed for the store that the user is interested in querying. So >>> we >>>>>>>>>> end up with a discussion on who provides the name, the user (as >>> done >>>>>>>>>> today) or if it is generated automatically (as Jan suggests, as I >>>>>>>>>> understand it). If it is generated automatically we need a way to >>>>>>>>>> expose these auto-generated names to the users and link them to >>> the >>>>>>>>>> KTables they care to query. >>>>>>>>> Hi, the last sentence is what I currently arguing against. The user >>>>>>>>> would never see a stringtype indentifier name or anything. All he >>>> gets >>>>>>>>> is the queryHandle if he executes a get(K) that will be an >>>> interactive >>>>>>>>> query get. with all the finding the right servers that currently >>>> have a >>>>>>>>> copy of this underlying store stuff going on. The nice part is that >>>> if >>>>>>>>> someone retrieves a queryHandle, you know that you have to >>>> materialized >>>>>>>>> (if you are not already) as queries will be coming. Taking away the >>>>>>>>> confusion mentioned in point 1 IMO. >>>>>>>>>> >>>>>>>>>> 3. The exact boundary between the DSL, that is the processing >>>>>>>>>> language, and the storage/IQ queries, and how we jump from one to >>>> the >>>>>>>>>> other. This is mostly for how we get a handle on a store (so it's >>>>>>>>>> related to point 2), rather than for how we query the store. I >>> think >>>>>>>>>> we all agree that we don't want to limit ways one can query a >>> store >>>>>>>>>> (e.g., using gets or range queries etc) and the query APIs are not >>>> in >>>>>>>>>> the scope of the DSL. >>>>>>>>> Does the IQ work with range currently? The range would have to be >>>>>>>>> started on all stores and then merged by maybe the client. Range >>>> force a >>>>>>>>> flush to RocksDB currently so I am sure you would get a performance >>>> hit >>>>>>>>> right there. Time-windows might be okay, but I am not sure if the >>>> first >>>>>>>>> version should offer the user range access. >>>>>>>>>> >>>>>>>>>> 4. The nature of the DSL and whether its declarative enough, or >>>>>>>>>> flexible enough. Damian made the point that he likes the builder >>>>>>>>>> pattern since users can specify, per KTable, things like caching >>> and >>>>>>>>>> logging needs. His observation (as I understand it) is that the >>>>>>>>>> processor API (PAPI) is flexible but doesn't provide any help at >>> all >>>>>>>>>> to users. The current DSL provides declarative abstractions, but >>>> it's >>>>>>>>>> not fine-grained enough. This point is much broader than the KIP, >>>> but >>>>>>>>>> discussing it in this KIPs context is ok, since we don't want to >>>> make >>>>>>>>>> small piecemeal changes and then realise we're not in the spot we >>>> want >>>>>>>>>> to be. >>>>>>>>> This is indeed much broader. My guess here is that's why both API's >>>>>>>>> exists and helping the users to switch back and forth might be a >>>> thing. >>>>>>>>>> >>>>>>>>>> Feel free to pitch in if I have misinterpreted something. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Eno >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On 30 Jan 2017, at 10:22, Jan Filipiak <jan.filip...@trivago.com >>>> >>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Eno, >>>>>>>>>>> >>>>>>>>>>> I have a really hard time understanding why we can't. From my >>> point >>>>>>>>>>> of view everything could be super elegant DSL only + public api >>> for >>>>>>>>>>> the PAPI-people as already exist. >>>>>>>>>>> >>>>>>>>>>> The above aproach implementing a .get(K) on KTable is foolisch in >>>> my >>>>>>>>>>> opinion as it would be to late to know that materialisation would >>>> be >>>>>>>>>>> required. >>>>>>>>>>> But having an API that allows to indicate I want to query this >>>> table >>>>>>>>>>> and then wrapping the say table's processorname can work out >>> really >>>>>>>>>>> really nice. The only obstacle I see is people not willing to >>> spend >>>>>>>>>>> the additional time in implementation and just want a quick shot >>>>>>>>>>> option to make it work. >>>>>>>>>>> >>>>>>>>>>> For me it would look like this: >>>>>>>>>>> >>>>>>>>>>> table = builder.table() >>>>>>>>>>> filteredTable = table.filter() >>>>>>>>>>> rawHandle = table.getQueryHandle() // Does the materialisation, >>>>>>>>>>> really all names possible but id rather hide the implication of >>> it >>>>>>>>>>> materializes >>>>>>>>>>> filteredTableHandle = filteredTable.getQueryHandle() // this >>> would >>>>>>>>>>> _not_ materialize again of course, the source or the aggregator >>>> would >>>>>>>>>>> stay the only materialized processors >>>>>>>>>>> streams = new streams(builder) >>>>>>>>>>> >>>>>>>>>>> This middle part is highly flexible I could imagin to force the >>>> user >>>>>>>>>>> todo something like this. This implies to the user that his >>> streams >>>>>>>>>>> need to be running >>>>>>>>>>> instead of propagating the missing initialisation back by >>>> exceptions. >>>>>>>>>>> Also if the users is forced to pass the appropriate streams >>>> instance >>>>>>>>>>> back can change. >>>>>>>>>>> I think its possible to build multiple streams out of one >>> topology >>>>>>>>>>> so it would be easiest to implement aswell. This is just what I >>>> maybe >>>>>>>>>>> had liked the most >>>>>>>>>>> >>>>>>>>>>> streams.start(); >>>>>>>>>>> rawHandle.prepare(streams) >>>>>>>>>>> filteredHandle.prepare(streams) >>>>>>>>>>> >>>>>>>>>>> later the users can do >>>>>>>>>>> >>>>>>>>>>> V value = rawHandle.get(K) >>>>>>>>>>> V value = filteredHandle.get(K) >>>>>>>>>>> >>>>>>>>>>> This could free DSL users from anything like storenames and how >>> and >>>>>>>>>>> what to materialize. Can someone indicate what the problem would >>> be >>>>>>>>>>> implementing it like this. >>>>>>>>>>> Yes I am aware that the current IQ API will not support querying >>> by >>>>>>>>>>> KTableProcessorName instread of statestoreName. But I think that >>>> had >>>>>>>>>>> to change if you want it to be intuitive >>>>>>>>>>> IMO you gotta apply the filter read time >>>>>>>>>>> >>>>>>>>>>> Looking forward to your opinions >>>>>>>>>>> >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> #DeathToIQMoreAndBetterConnectors >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 30.01.2017 10:42, Eno Thereska wrote: >>>>>>>>>>>> Hi there, >>>>>>>>>>>> >>>>>>>>>>>> The inconsistency will be resolved, whether with materialize or >>>>>>>>>>>> overloaded methods. >>>>>>>>>>>> >>>>>>>>>>>> With the discussion on the DSL & stores I feel we've gone in a >>>>>>>>>>>> slightly different tangent, which is worth discussing >>> nonetheless. >>>>>>>>>>>> We have entered into an argument around the scope of the DSL. >>> The >>>>>>>>>>>> DSL has been designed primarily for processing. The DSL does not >>>>>>>>>>>> dictate ways to access state stores or what hind of queries to >>>>>>>>>>>> perform on them. Hence, I see the mechanism for accessing >>> storage >>>> as >>>>>>>>>>>> decoupled from the DSL. >>>>>>>>>>>> >>>>>>>>>>>> We could think of ways to get store handles from part of the >>> DSL, >>>>>>>>>>>> like the KTable abstraction. However, subsequent queries will be >>>>>>>>>>>> store-dependent and not rely on the DSL, hence I'm not sure we >>> get >>>>>>>>>>>> any grand-convergence DSL-Store here. So I am arguing that the >>>>>>>>>>>> current way of getting a handle on state stores is fine. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Eno >>>>>>>>>>>> >>>>>>>>>>>>> On 30 Jan 2017, at 03:56, Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Thinking loud here about the API options (materialize v.s. >>>> overloaded >>>>>>>>>>>>> functions) and its impact on IQ: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. The first issue of the current DSL is that, there is >>>>>>>>>>>>> inconsistency upon >>>>>>>>>>>>> whether / how KTables should be materialized: >>>>>>>>>>>>> >>>>>>>>>>>>> a) in many cases the library HAS TO materialize KTables no >>>>>>>>>>>>> matter what, >>>>>>>>>>>>> e.g. KStream / KTable aggregation resulted KTables, and hence >>> we >>>>>>>>>>>>> enforce >>>>>>>>>>>>> users to provide store names and throw RTE if it is null; >>>>>>>>>>>>> b) in some other cases, the KTable can be materialized or not; >>>> for >>>>>>>>>>>>> example in KStreamBuilder.table(), store names can be nullable >>>> and >>>>>>>>>>>>> in which >>>>>>>>>>>>> case the KTable would not be materialized; >>>>>>>>>>>>> c) in some other cases, the KTable will never be materialized, >>>> for >>>>>>>>>>>>> example KTable.filter() resulted KTables, and users have no >>>> options to >>>>>>>>>>>>> enforce them to be materialized; >>>>>>>>>>>>> d) this is related to a), where some KTables are required to >>> be >>>>>>>>>>>>> materialized, but we do not enforce users to provide a state >>>> store >>>>>>>>>>>>> name, >>>>>>>>>>>>> e.g. KTables involved in joins; a RTE will be thrown not >>>>>>>>>>>>> immediately but >>>>>>>>>>>>> later in this case. >>>>>>>>>>>>> >>>>>>>>>>>>> 2. The second issue is related to IQ, where state stores are >>>>>>>>>>>>> accessed by >>>>>>>>>>>>> their state stores; so only those KTable's that have >>>> user-specified >>>>>>>>>>>>> state >>>>>>>>>>>>> stores will be queryable. But because of 1) above, many stores >>>> may >>>>>>>>>>>>> not be >>>>>>>>>>>>> interested to users for IQ but they still need to provide a >>>>>>>>>>>>> (dummy?) state >>>>>>>>>>>>> store name for them; while on the other hand users cannot query >>>>>>>>>>>>> some state >>>>>>>>>>>>> stores, e.g. the ones generated by KTable.filter() as there is >>> no >>>>>>>>>>>>> APIs for >>>>>>>>>>>>> them to specify a state store name. >>>>>>>>>>>>> >>>>>>>>>>>>> 3. We are aware from user feedbacks that such backend details >>>> would be >>>>>>>>>>>>> better be abstracted away from the DSL layer, where app >>>> developers >>>>>>>>>>>>> should >>>>>>>>>>>>> just focus on processing logic, while state stores along with >>>> their >>>>>>>>>>>>> changelogs etc would better be in a different mechanism; same >>>>>>>>>>>>> arguments >>>>>>>>>>>>> have been discussed for serdes / windowing triggers as well. >>> For >>>>>>>>>>>>> serdes >>>>>>>>>>>>> specifically, we had a very long discussion about it and >>>> concluded >>>>>>>>>>>>> that, at >>>>>>>>>>>>> least in Java7, we cannot completely abstract serde away in the >>>>>>>>>>>>> DSL, so we >>>>>>>>>>>>> choose the other extreme to enforce users to be completely >>> aware >>>> of >>>>>>>>>>>>> the >>>>>>>>>>>>> serde requirements when some KTables may need to be >>> materialized >>>> vis >>>>>>>>>>>>> overloaded API functions. While for the state store names, I >>> feel >>>>>>>>>>>>> it is a >>>>>>>>>>>>> different argument than serdes (details below). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> So to me, for either materialize() v.s. overloaded functions >>>>>>>>>>>>> directions, >>>>>>>>>>>>> the first thing I'd like to resolve is the inconsistency issue >>>>>>>>>>>>> mentioned >>>>>>>>>>>>> above. So in either case: KTable materialization will not be >>>> affect >>>>>>>>>>>>> by user >>>>>>>>>>>>> providing state store name or not, but will only be decided by >>>> the >>>>>>>>>>>>> library >>>>>>>>>>>>> when it is necessary. More specifically, only join operator and >>>>>>>>>>>>> builder.table() resulted KTables are not always materialized, >>> but >>>>>>>>>>>>> are still >>>>>>>>>>>>> likely to be materialized lazily (e.g. when participated in a >>>> join >>>>>>>>>>>>> operator). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> For overloaded functions that would mean: >>>>>>>>>>>>> >>>>>>>>>>>>> a) we have an overloaded function for ALL operators that could >>>>>>>>>>>>> result >>>>>>>>>>>>> in a KTable, and allow it to be null (i.e. for the function >>>> without >>>>>>>>>>>>> this >>>>>>>>>>>>> param it is null by default); >>>>>>>>>>>>> b) null-state-store-name do not indicate that a KTable would >>>>>>>>>>>>> not be >>>>>>>>>>>>> materialized, but that it will not be used for IQ at all >>>> (internal >>>>>>>>>>>>> state >>>>>>>>>>>>> store names will be generated when necessary). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> For materialize() that would mean: >>>>>>>>>>>>> >>>>>>>>>>>>> a) we will remove state store names from ALL operators that >>>> could >>>>>>>>>>>>> result in a KTable. >>>>>>>>>>>>> b) KTables that not calling materialized do not indicate that >>> a >>>>>>>>>>>>> KTable >>>>>>>>>>>>> would not be materialized, but that it will not be used for IQ >>>> at all >>>>>>>>>>>>> (internal state store names will be generated when necessary). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Again, in either ways the API itself does not "hint" about >>>> anything >>>>>>>>>>>>> for >>>>>>>>>>>>> materializing a KTable or not at all; it is still purely >>>> determined >>>>>>>>>>>>> by the >>>>>>>>>>>>> library when parsing the DSL for now. >>>>>>>>>>>>> >>>>>>>>>>>>> Following these thoughts, I feel that 1) we should probably >>>> change >>>>>>>>>>>>> the name >>>>>>>>>>>>> "materialize" since it may be misleading to users as what >>>> actually >>>>>>>>>>>>> happened >>>>>>>>>>>>> behind the scene, to e.g. Damian suggested >>> "queryableStore(String >>>>>>>>>>>>> storeName)", >>>>>>>>>>>>> which returns a QueryableStateStore, and can replace the >>>>>>>>>>>>> `KafkaStreams.store` function; 2) comparing those two options >>>>>>>>>>>>> assuming we >>>>>>>>>>>>> get rid of the misleading function name, I personally favor not >>>>>>>>>>>>> adding more >>>>>>>>>>>>> overloading functions as it keeps the API simpler. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak >>>>>>>>>>>>> <jan.filip...@trivago.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> thanks for your mail, felt like this can clarify some things! >>>> The >>>>>>>>>>>>>> thread >>>>>>>>>>>>>> unfortunately split but as all branches close in on what my >>>>>>>>>>>>>> suggestion was >>>>>>>>>>>>>> about Ill pick this to continue >>>>>>>>>>>>>> >>>>>>>>>>>>>> Of course only the table the user wants to query would be >>>>>>>>>>>>>> materialized. >>>>>>>>>>>>>> (retrieving the queryhandle implies materialisation). So In >>> the >>>>>>>>>>>>>> example of >>>>>>>>>>>>>> KTable::filter if you call >>>>>>>>>>>>>> getIQHandle on both tables only the one source that is there >>>> would >>>>>>>>>>>>>> materialize and the QueryHandleabstraction would make sure it >>>> gets >>>>>>>>>>>>>> mapped >>>>>>>>>>>>>> and filtered and what not uppon read as usual. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Of Course the Object you would retrieve would maybe only wrap >>>> the >>>>>>>>>>>>>> storeName / table unique identifier and a way to access the >>>> streams >>>>>>>>>>>>>> instance and then basically uses the same mechanism that is >>>>>>>>>>>>>> currently used. >>>>>>>>>>>>>> From my point of view this is the least confusing way for DSL >>>>>>>>>>>>>> users. If >>>>>>>>>>>>>> its to tricky to get a hand on the streams instance one could >>>> ask >>>>>>>>>>>>>> the user >>>>>>>>>>>>>> to pass it in before executing queries, therefore making sure >>>> the >>>>>>>>>>>>>> streams >>>>>>>>>>>>>> instance has been build. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The effort to implement this is indeed some orders of >>> magnitude >>>>>>>>>>>>>> higher >>>>>>>>>>>>>> than the overloaded materialized call. As long as I could help >>>>>>>>>>>>>> getting a >>>>>>>>>>>>>> different view I am happy. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 28.01.2017 09:36, Eno Thereska wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Jan, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I understand your concern. One implication of not passing any >>>>>>>>>>>>>>> store name >>>>>>>>>>>>>>> and just getting an IQ handle is that all KTables would need >>>> to be >>>>>>>>>>>>>>> materialised. Currently the store name (or proposed >>>>>>>>>>>>>>> .materialize() call) >>>>>>>>>>>>>>> act as hints on whether to materialise the KTable or not. >>>>>>>>>>>>>>> Materialising >>>>>>>>>>>>>>> every KTable can be expensive, although there are some tricks >>>> one >>>>>>>>>>>>>>> can play, >>>>>>>>>>>>>>> e.g., have a virtual store rather than one backed by a Kafka >>>> topic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> However, even with the above, after getting an IQ handle, the >>>>>>>>>>>>>>> user would >>>>>>>>>>>>>>> still need to use IQ APIs to query the state. As such, we >>> would >>>>>>>>>>>>>>> still >>>>>>>>>>>>>>> continue to be outside the original DSL so this wouldn't >>>> address >>>>>>>>>>>>>>> your >>>>>>>>>>>>>>> original concern. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So I read this suggestion as simplifying the APIs by removing >>>> the >>>>>>>>>>>>>>> store >>>>>>>>>>>>>>> name, at the cost of having to materialise every KTable. It's >>>>>>>>>>>>>>> definitely an >>>>>>>>>>>>>>> option we'll consider as part of this KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 28 Jan 2017, at 06:49, Jan Filipiak < >>>> jan.filip...@trivago.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> Hi Exactly >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I know it works from the Processor API, but my suggestion >>>> would >>>>>>>>>>>>>>>> prevent >>>>>>>>>>>>>>>> DSL users dealing with storenames what so ever. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In general I am pro switching between DSL and Processor API >>>>>>>>>>>>>>>> easily. (In >>>>>>>>>>>>>>>> my Stream applications I do this a lot with reflection and >>>>>>>>>>>>>>>> instanciating >>>>>>>>>>>>>>>> KTableImpl) Concerning this KIP all I say is that there >>> should >>>>>>>>>>>>>>>> be a DSL >>>>>>>>>>>>>>>> concept of "I want to expose this __KTable__. This can be a >>>>>>>>>>>>>>>> Method like >>>>>>>>>>>>>>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the >>> table >>>>>>>>>>>>>>>> would know >>>>>>>>>>>>>>>> to materialize, and the user had a reference to the "store >>>> and the >>>>>>>>>>>>>>>> distributed query mechanism by the Interactive Query Handle" >>>>>>>>>>>>>>>> under the hood >>>>>>>>>>>>>>>> it can use the same mechanism as the PIP people again. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I hope you see my point J >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 27.01.2017 21:59, Matthias J. Sax wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jan, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the IQ feature is not limited to Streams DSL but can also >>> be >>>>>>>>>>>>>>>>> used for >>>>>>>>>>>>>>>>> Stores used in PAPI. Thus, we need a mechanism that does >>> work >>>>>>>>>>>>>>>>> for PAPI >>>>>>>>>>>>>>>>> and DSL. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Nevertheless I see your point and I think we could provide >>> a >>>>>>>>>>>>>>>>> better API >>>>>>>>>>>>>>>>> for KTable stores including the discovery of remote shards >>> of >>>>>>>>>>>>>>>>> the same >>>>>>>>>>>>>>>>> KTable. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Michael: Yes, right now we do have a lot of overloads and >>> I >>>> am >>>>>>>>>>>>>>>>> not a >>>>>>>>>>>>>>>>> big fan of those -- I would rather prefer a builder >>> pattern. >>>>>>>>>>>>>>>>> But that >>>>>>>>>>>>>>>>> might be a different discussion (nevertheless, if we would >>>> aim >>>>>>>>>>>>>>>>> for a API >>>>>>>>>>>>>>>>> rework, we should get the changes with regard to stores >>> right >>>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>>> beginning on, in order to avoid a redesign later on.) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> something like: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream.groupyByKey() >>>>>>>>>>>>>>>>> .window(TimeWindow.of(5000)) >>>>>>>>>>>>>>>>> .aggregate(...) >>>>>>>>>>>>>>>>> .withAggValueSerde(new CustomTypeSerde()) >>>>>>>>>>>>>>>>> .withStoreName("storeName); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (This would also reduce JavaDoc redundancy -- maybe a >>>> personal >>>>>>>>>>>>>>>>> pain >>>>>>>>>>>>>>>>> point right now :)) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yeah, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Maybe my bad that I refuse to look into IQ as i don't find >>>> them >>>>>>>>>>>>>>>>>> anywhere >>>>>>>>>>>>>>>>>> close to being interesting. The Problem IMO is that people >>>>>>>>>>>>>>>>>> need to know >>>>>>>>>>>>>>>>>> the Store name), so we are working on different levels to >>>>>>>>>>>>>>>>>> achieve a >>>>>>>>>>>>>>>>>> single goal. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What is your peoples opinion on having a method on KTABLE >>>> that >>>>>>>>>>>>>>>>>> returns >>>>>>>>>>>>>>>>>> them something like a Keyvalue store. There is of course >>>>>>>>>>>>>>>>>> problems like >>>>>>>>>>>>>>>>>> "it cant be used before the streamthreads are going and >>>>>>>>>>>>>>>>>> groupmembership >>>>>>>>>>>>>>>>>> is established..." but the benefit would be that for the >>>> user >>>>>>>>>>>>>>>>>> there is >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> consistent way of saying "Hey I need it materialized as >>>>>>>>>>>>>>>>>> querries gonna >>>>>>>>>>>>>>>>>> be comming" + already get a Thing that he can execute the >>>>>>>>>>>>>>>>>> querries on >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> 1 step. >>>>>>>>>>>>>>>>>> What I think is unintuitive here is you need to say >>>>>>>>>>>>>>>>>> materialize on this >>>>>>>>>>>>>>>>>> Ktable and then you go somewhere else and find its store >>>> name >>>>>>>>>>>>>>>>>> and then >>>>>>>>>>>>>>>>>> you go to the kafkastreams instance and ask for the store >>>> with >>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> name. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So one could the user help to stay in DSL land and >>> therefore >>>>>>>>>>>>>>>>>> maybe >>>>>>>>>>>>>>>>>> confuse him less. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 27.01.2017 16:51, Damian Guy wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think Jan is saying that they don't always need to be >>>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>>>>>>> i.e., >>>>>>>>>>>>>>>>>>> filter just needs to apply the ValueGetter, it doesn't >>>> need yet >>>>>>>>>>>>>>>>>>> another >>>>>>>>>>>>>>>>>>> physical state store. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll < >>>> mich...@confluent.io> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Like Damian, and for the same reasons, I am more in favor >>>> of >>>>>>>>>>>>>>>>>>>> overloading >>>>>>>>>>>>>>>>>>>> methods rather than introducing `materialize()`. >>>>>>>>>>>>>>>>>>>> FWIW, we already have a similar API setup for e.g. >>>>>>>>>>>>>>>>>>>> `KTable#through(topicName, stateStoreName)`. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> A related but slightly different question is what e.g. >>> Jan >>>>>>>>>>>>>>>>>>>> Filipiak >>>>>>>>>>>>>>>>>>>> mentioned earlier in this thread: >>>>>>>>>>>>>>>>>>>> I think we need to explain more clearly why KIP-114 >>>> doesn't >>>>>>>>>>>>>>>>>>>> propose >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> seemingly simpler solution of always materializing >>>> tables/state >>>>>>>>>>>>>>>>>>>> stores. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak < >>>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? >>> If >>>>>>>>>>>>>>>>>>>>> you uses >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> ValueGetter of Filter it will apply the filter and >>>> should be >>>>>>>>>>>>>>>>>>>>> completely >>>>>>>>>>>>>>>>>>>>> transparent as to if another processor or IQ is >>> accessing >>>>>>>>>>>>>>>>>>>>> it? How >>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> new method help? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I cannot see the reason for the additional materialize >>>>>>>>>>>>>>>>>>>>> method being >>>>>>>>>>>>>>>>>>>>> required! Hence I suggest leave it alone. >>>>>>>>>>>>>>>>>>>>> regarding removing the others I dont have strong >>> opinions >>>>>>>>>>>>>>>>>>>>> and it >>>>>>>>>>>>>>>>>>>>> seems to >>>>>>>>>>>>>>>>>>>>> be unrelated. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Forwarding this thread to the users list too in case >>>> people >>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> comment. It is also on the dev list. >>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Begin forwarded message: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable >>> materialization >>>> and >>>>>>>>>>>>>>>>>>>>>>> improved >>>>>>>>>>>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT >>>>>>>>>>>>>>>>>>>>>>> To: dev@kafka.apache.org >>>>>>>>>>>>>>>>>>>>>>> Reply-To: dev@kafka.apache.org >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> That not what I meant by "huge impact". >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I refer to the actions related to materialize a >>> KTable: >>>>>>>>>>>>>>>>>>>>>>> creating a >>>>>>>>>>>>>>>>>>>>>>> RocksDB store and a changelog topic -- users should >>> be >>>>>>>>>>>>>>>>>>>>>>> aware about >>>>>>>>>>>>>>>>>>>>>>> runtime implication and this is better expressed by >>> an >>>>>>>>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>>> call, rather than implicitly triggered by using a >>>> different >>>>>>>>>>>>>>>>>>>>>>> overload of >>>>>>>>>>>>>>>>>>>>>>> a method. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I think your definition of a huge impact and mine are >>>> rather >>>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>> ;-P >>>>>>>>>>>>>>>>>>>>>>>> Overloading a few methods is not really a huge >>> impact >>>>>>>>>>>>>>>>>>>>>>>> IMO. It is >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> also a >>>>>>>>>>>>>>>>>>>>> sacrifice worth making for readability, usability of >>> the >>>> API. >>>>>>>>>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax < >>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I understand your argument, but do not agree with >>> it. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Your first version (even if the "flow" is not as >>>> nice) >>>>>>>>>>>>>>>>>>>>>>>>> is more >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>>>>>>>>> than the second version. Adding a stateStoreName >>>> parameter >>>>>>>>>>>>>>>>>>>>> is quite >>>>>>>>>>>>>>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer >>> the >>>>>>>>>>>>>>>>>>>>>>>>> rather more >>>>>>>>>>>>>>>>>>>>>>>>> verbose >>>>>>>>>>>>>>>>>>>>>>>>> but explicit version. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts >>>> the >>>>>>>>>>>>>>>>>>>>>>>>> flow, >>>>>>>>>>>>>>>>>>>>>>>>>> i.e, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>> table.mapValue(..).materialize().join(..).materialize() >>>>>>>>>>>>>>>>>>>>>>>>>> compared to: >>>>>>>>>>>>>>>>>>>>>>>>>> table.mapValues(..).join(..) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I know which one i prefer. >>>>>>>>>>>>>>>>>>>>>>>>>> My preference is stil to provide overloaded >>> methods >>>> where >>>>>>>>>>>>>>>>>>>>>>>>>> people can >>>>>>>>>>>>>>>>>>>>>>>>>> specify the store names if they want, otherwise we >>>> just >>>>>>>>>>>>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax >>>>>>>>>>>>>>>>>>>>>>>>>> <matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing >>> store >>>>>>>>>>>>>>>>>>>>>>>>>>> name from >>>>>>>>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>>>>>>>>> methods and generate internal names (however, I >>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>> do this >>>>>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>> overloads). Furthermore, I would not force users >>>> to call >>>>>>>>>>>>>>>>>>>>>>>>>>> .materialize() >>>>>>>>>>>>>>>>>>>>>>>>>>> if they want to query a store, but add one more >>>> method >>>>>>>>>>>>>>>>>>>>>>>>>>> .stateStoreName() >>>>>>>>>>>>>>>>>>>>>>>>>>> that returns the store name if the KTable is >>>>>>>>>>>>>>>>>>>>>>>>>>> materialized. >>>>>>>>>>>>>>>>>>>>>>>>>>> Thus, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>>> .materialize() must not necessarily have a parameter >>>> storeName >>>>>>>>>>>>>>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>> should have some overloads here). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I would also not allow to provide a null store >>>> name (to >>>>>>>>>>>>>>>>>>>>>>>>>>> indicate no >>>>>>>>>>>>>>>>>>>>>>>>>>> materialization if not necessary) but throw an >>>>>>>>>>>>>>>>>>>>>>>>>>> exception. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> This yields some simplification (see below). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about >>>>>>>>>>>>>>>>>>>>>>>>>>> KStream#toTable() >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 3) >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on >>>>>>>>>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore >>>>>>>>>>>>>>>>>>>>>>>>>>>> (providing >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> different), throw an Exception? >>>>>>>>>>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>>>>>>>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there >>> is >>>> no >>>>>>>>>>>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>>>>>>>> worry >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>>>>>> a second materialization and also no exception >>>> must be >>>>>>>>>>>>>>>>>>>>>>>>>>> throws. A >>>>>>>>>>>>>>>>>>>>>>>>>>> call to >>>>>>>>>>>>>>>>>>>>>>>>>>> .materialize() basically sets a "materialized >>>> flag" (ie, >>>>>>>>>>>>>>>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>>>>>>>>>>>>>>> operation) and sets a new name. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 4) >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We >>> also >>>> use >>>>>>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and >>>>>>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#table()`, for >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> don't care about the "K" prefix. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Eno's reply: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it >>>>>>>>>>>>>>>>>>>>>>>>>>> absolutely >>>>>>>>>>>>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> we are converting it to. >>>>>>>>>>>>>>>>>>>>>>>>>>> I'd say we should probably change the >>>> KStreamBuilder >>>>>>>>>>>>>>>>>>>>>>>>>>> methods >>>>>>>>>>>>>>>>>>>>>>>>>>>> (but >>>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>> this KIP). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I would keep #toStream(). (see below) >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 5) We should not remove any methods but only >>>>>>>>>>>>>>>>>>>>>>>>>>> deprecate them. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> A general note: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I do not understand your comments "Rejected >>>>>>>>>>>>>>>>>>>>>>>>>>> Alternatives". You >>>>>>>>>>>>>>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>>>>>>>>>>>> "Have >>>>>>>>>>>>>>>>>>>>>>>>>>> the KTable be the materialized view" was >>> rejected. >>>>>>>>>>>>>>>>>>>>>>>>>>> But your >>>>>>>>>>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of >>>>>>>>>>>>>>>>>>>>>>>>>>> KTable is >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> secondary >>>>>>>>>>>>>>>>>>>>> after those changes and the "view" abstraction is what >>> a >>>>>>>>>>>>>>>>>>>>>>>>>>> KTable is. >>>>>>>>>>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>>>>>>>> just to be clear, I like this a lot: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> - it aligns with the name KTable >>>>>>>>>>>>>>>>>>>>>>>>>>> - is aligns with stream-table-duality >>>>>>>>>>>>>>>>>>>>>>>>>>> - it aligns with IQ >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction" >>>> (as >>>>>>>>>>>>>>>>>>>>>>>>>>> materialization is >>>>>>>>>>>>>>>>>>>>>>>>>>> optional). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta >>> comments >>>>>>>>>>>>>>>>>>>>>>>>>>> and a few >>>>>>>>>>>>>>>>>>>>>>>>>>>> detailed >>>>>>>>>>>>>>>>>>>>>>>>>>>> comments: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I like the materialize() function in general, >>>> but >>>>>>>>>>>>>>>>>>>>>>>>>>>> I would >>>>>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>>>>>>>>> how other KTable functions should be updated >>>>>>>>>>>>>>>>>>>>>>>>>> accordingly. For >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>>> 1) >>>>>>>>>>>>>>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name >>>>>>>>>>>>>>>>>>>>>>>>>> parameter, and >>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>> always materialize the KTable unless its state store >>> name >>>>>>>>>>>>>>>>>>>>> is set >>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> null; >>>>>>>>>>>>>>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be >>>>>>>>>>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> hence >>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>>> also have a state store name; 3) KTable.join >>>> requires the >>>>>>>>>>>>>>>>>>>>>>>>>> joining >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> be materialized. And today we do not actually >>> have >>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>> mechanism to >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>>>>>>>>>>>>>> that, but will only throw an exception at runtime >>> if >>>>>>>>>>>>>>>>>>>>>>>>>> it is not >>>>>>>>>>>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE >>>> will be >>>>>>>>>>>>>>>>>>>>>>>>>>>> thrown). >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off >>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>>>>>>>>> let's >>>>>>>>>>>>>>>>>>>>>>>>>> remove all the state store params in other KTable >>>>>>>>>>>>>>>>>>>>>>>>>> functions, >>>>>>>>>>>>>>>>>>>>>>>>>> and if >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable >>>>>>>>>>>>>>>>>>>>>>>>>>> resulted >>>>>>>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> KXX.agg) >>>>>>>>>>>>>>>>>>>>>>>>>> and users do not call materialize(), then we treat >>>> it >>>>>>>>>>>>>>>>>>>>>>>>>> as "users >>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>> interested in querying it at all" and hence use >>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> generated >>>>>>>>>>>>>>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is >>>>>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>>>>>> store is not exposed to users. And if users call >>>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> afterwards >>>>>>>>>>>>>>>>>>>>>>>>>> but we have already decided to materialize it, we >>>> can >>>>>>>>>>>>>>>>>>>>>>>>>> replace >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> name with the user's provided names. Then from a >>>> user's >>>>>>>>>>>>>>>>>>>>>>>>>>> point-view, >>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>> ever want to query a KTable, they have to call >>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> given >>>>>>>>>>>>>>>>>>>>>>>>>> state store name. This approach has one >>> awkwardness >>>>>>>>>>>>>>>>>>>>>>>>>> though, >>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> serdes >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>> state store names param are not separated and >>>> could be >>>>>>>>>>>>>>>>>>>>>>>>>>>> overlapped >>>>>>>>>>>>>>>>>>>>>>>>>>>> (see >>>>>>>>>>>>>>>>>>>>>>>>>>>> detailed comment #2 below). >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. This step does not need to be included in >>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP, but >>>>>>>>>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> as a >>>>>>>>>>>>>>>>>>>>> reference / future work: as we have discussed before, >>> we >>>> may >>>>>>>>>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>>>>>>>>> materialize KTable.join resulted KTables as well in the >>>>>>>>>>>>>>>>>>>>>>>>>>>> future. If >>>>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>>>>> that, then: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always >>>> materialized; >>>>>>>>>>>>>>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to >>>>>>>>>>>>>>>>>>>>>>>>>>>> always be >>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>>>> (otherwise we would not know the old value); >>>>>>>>>>>>>>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always >>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized, and >>>>>>>>>>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> joining KTables to always be materialized. >>>>>>>>>>>>>>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables >>>>>>>>>>>>>>>>>>>>>>>>>>>> materialization >>>>>>>>>>>>>>>>>>>>>>>>>>>> depend >>>>>>>>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>>>>>> parent's materialization; >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> By recursive induction all KTables are actually >>>> always >>>>>>>>>>>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> then the effect of the "materialize()" is just for >>>>>>>>>>>>>>>>>>>>>>>>>> specifying >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>>>>>> store names. In this scenario, we do not need to >>>> send >>>>>>>>>>>>>>>>>>>>>>>>>>>> Change<V> in >>>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics within joins any more, but >>>> only for >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> repartitions >>>>>>>>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>>>>>>>>> within aggregations. Instead, we can just send a >>>>>>>>>>>>>>>>>>>>>>>>>>> "tombstone" >>>>>>>>>>>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> old value and we do not need to calculate joins >>>> twice >>>>>>>>>>>>>>>>>>>>>>>>>> (one more >>>>>>>>>>>>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> old value is received). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a >>>>>>>>>>>>>>>>>>>>>>>>>>>> "KStream#toTable()" >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> function >>>>>>>>>>>>>>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where >>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>>>> value >>>>>>>>>>>>>>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of >>>> use >>>>>>>>>>>>>>>>>>>>>>>>>>>> cases of >>>>>>>>>>>>>>>>>>>>>>>>>>>> this, >>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>> example, users want to read a changelog topic, >>>> apply >>>>>>>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>> filters, >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>>>>>>> materialize it into a KTable with state stores >>>> without >>>>>>>>>>>>>>>>>>>>>>>>>>>> creating >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> duplicated >>>>>>>>>>>>>>>>>>>>>>>>>>> changelog topics. With materialize() and toTable >>>> I'd >>>>>>>>>>>>>>>>>>>>>>>>>>> imagine >>>>>>>>>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>> specify sth. like: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>>>>>>>>>>>> KStream stream = >>>> builder.stream("topic1").filter(..); >>>>>>>>>>>>>>>>>>>>>>>>>>>> KTable table = stream.toTable(..); >>>>>>>>>>>>>>>>>>>>>>>>>>>> table.materialize("state1"); >>>>>>>>>>>>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> And the library in this case could set store >>>>>>>>>>>>>>>>>>>>>>>>>>>> "state1" 's >>>>>>>>>>>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly >>>> while >>>>>>>>>>>>>>>>>>>>>>>>>>>> (re-)storing >>>>>>>>>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>>>>>>>> state by reading from this topic, instead of >>>> creating a >>>>>>>>>>>>>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a >>>>>>>>>>>>>>>>>>>>>>>>>> semi-duplicate >>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> "topic1". >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Detailed: >>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; >>>>>>>>>>>>>>>>>>>>>>>>>>>> actually I was >>>>>>>>>>>> >>> >