Thanks everyone! I think it's time to do a V2 on the KIP so I'll do that and we can see how it looks and continue the discussion from there. Stay tuned.
Thanks Eno > On 30 Jan 2017, at 17:23, Matthias J. Sax <matth...@confluent.io> wrote: > > Hi, > > I think Eno's separation is very clear and helpful. In order to > streamline this discussion, I would suggest we focus back on point (1) > only, as this is the original KIP question. > > Even if I started to DSL design discussion somehow, because I thought it > might be helpful to resolve both in a single shot, I feel that we have > too many options about DSL design and we should split it up in two > steps. This will have the disadvantage that we will change the API > twice, but still, I think it will be a more focused discussion. > > I just had another look at the KIP, an it proposes 3 changes: > > 1. add .materialized() -> IIRC it was suggested to name this > .materialize() though (can you maybe update the KIP Eno?) > 2. remove print(), writeAsText(), and foreach() > 3. rename toStream() to toKStream() > > > I completely agree with (2) -- not sure about (3) though because > KStreamBuilder also hast .stream() and .table() as methods. > > However, we might want to introduce a KStream#toTable() -- this was > requested multiple times -- might also be part of a different KIP. > > > Thus, we end up with (1). I would suggest to do a step backward here and > instead of a discussion how to express the changes in the DSL (new > overload, new methods...) we should discuss what the actual change > should be. Like (1) materialize all KTable all the time (2) all the user > to force a materialization to enable querying the KTable (3) allow for > queryable non-materialized KTable. > > On more question is, if we want to allow a user-forced materialization > only as as local store without changelog, or both (together / > independently)? We got some request like this already. > > > -Matthias > > > On 1/30/17 3:50 AM, Jan Filipiak wrote: >> Hi Eno, >> >> thanks for putting into different points. I want to put a few remarks >> inline. >> >> Best Jan >> >> On 30.01.2017 12:19, Eno Thereska wrote: >>> So I think there are several important discussion threads that are >>> emerging here. Let me try to tease them apart: >>> >>> 1. inconsistency in what is materialized and what is not, what is >>> queryable and what is not. I think we all agree there is some >>> inconsistency there and this will be addressed with any of the >>> proposed approaches. Addressing the inconsistency is the point of the >>> original KIP. >>> >>> 2. the exact API for materializing a KTable. We can specify 1) a >>> "store name" (as we do today) or 2) have a ".materialize[d]" call or >>> 3) get a handle from a KTable ".getQueryHandle" or 4) have a builder >>> construct. So we have discussed 4 options. It is important to remember >>> in this discussion that IQ is not designed for just local queries, but >>> also for distributed queries. In all cases an identifying name/id is >>> needed for the store that the user is interested in querying. So we >>> end up with a discussion on who provides the name, the user (as done >>> today) or if it is generated automatically (as Jan suggests, as I >>> understand it). If it is generated automatically we need a way to >>> expose these auto-generated names to the users and link them to the >>> KTables they care to query. >> Hi, the last sentence is what I currently arguing against. The user >> would never see a stringtype indentifier name or anything. All he gets >> is the queryHandle if he executes a get(K) that will be an interactive >> query get. with all the finding the right servers that currently have a >> copy of this underlying store stuff going on. The nice part is that if >> someone retrieves a queryHandle, you know that you have to materialized >> (if you are not already) as queries will be coming. Taking away the >> confusion mentioned in point 1 IMO. >>> >>> 3. The exact boundary between the DSL, that is the processing >>> language, and the storage/IQ queries, and how we jump from one to the >>> other. This is mostly for how we get a handle on a store (so it's >>> related to point 2), rather than for how we query the store. I think >>> we all agree that we don't want to limit ways one can query a store >>> (e.g., using gets or range queries etc) and the query APIs are not in >>> the scope of the DSL. >> Does the IQ work with range currently? The range would have to be >> started on all stores and then merged by maybe the client. Range force a >> flush to RocksDB currently so I am sure you would get a performance hit >> right there. Time-windows might be okay, but I am not sure if the first >> version should offer the user range access. >>> >>> 4. The nature of the DSL and whether its declarative enough, or >>> flexible enough. Damian made the point that he likes the builder >>> pattern since users can specify, per KTable, things like caching and >>> logging needs. His observation (as I understand it) is that the >>> processor API (PAPI) is flexible but doesn't provide any help at all >>> to users. The current DSL provides declarative abstractions, but it's >>> not fine-grained enough. This point is much broader than the KIP, but >>> discussing it in this KIPs context is ok, since we don't want to make >>> small piecemeal changes and then realise we're not in the spot we want >>> to be. >> This is indeed much broader. My guess here is that's why both API's >> exists and helping the users to switch back and forth might be a thing. >>> >>> Feel free to pitch in if I have misinterpreted something. >>> >>> Thanks >>> Eno >>> >>> >>>> On 30 Jan 2017, at 10:22, Jan Filipiak <jan.filip...@trivago.com> wrote: >>>> >>>> Hi Eno, >>>> >>>> I have a really hard time understanding why we can't. From my point >>>> of view everything could be super elegant DSL only + public api for >>>> the PAPI-people as already exist. >>>> >>>> The above aproach implementing a .get(K) on KTable is foolisch in my >>>> opinion as it would be to late to know that materialisation would be >>>> required. >>>> But having an API that allows to indicate I want to query this table >>>> and then wrapping the say table's processorname can work out really >>>> really nice. The only obstacle I see is people not willing to spend >>>> the additional time in implementation and just want a quick shot >>>> option to make it work. >>>> >>>> For me it would look like this: >>>> >>>> table = builder.table() >>>> filteredTable = table.filter() >>>> rawHandle = table.getQueryHandle() // Does the materialisation, >>>> really all names possible but id rather hide the implication of it >>>> materializes >>>> filteredTableHandle = filteredTable.getQueryHandle() // this would >>>> _not_ materialize again of course, the source or the aggregator would >>>> stay the only materialized processors >>>> streams = new streams(builder) >>>> >>>> This middle part is highly flexible I could imagin to force the user >>>> todo something like this. This implies to the user that his streams >>>> need to be running >>>> instead of propagating the missing initialisation back by exceptions. >>>> Also if the users is forced to pass the appropriate streams instance >>>> back can change. >>>> I think its possible to build multiple streams out of one topology >>>> so it would be easiest to implement aswell. This is just what I maybe >>>> had liked the most >>>> >>>> streams.start(); >>>> rawHandle.prepare(streams) >>>> filteredHandle.prepare(streams) >>>> >>>> later the users can do >>>> >>>> V value = rawHandle.get(K) >>>> V value = filteredHandle.get(K) >>>> >>>> This could free DSL users from anything like storenames and how and >>>> what to materialize. Can someone indicate what the problem would be >>>> implementing it like this. >>>> Yes I am aware that the current IQ API will not support querying by >>>> KTableProcessorName instread of statestoreName. But I think that had >>>> to change if you want it to be intuitive >>>> IMO you gotta apply the filter read time >>>> >>>> Looking forward to your opinions >>>> >>>> Best Jan >>>> >>>> >>>> #DeathToIQMoreAndBetterConnectors >>>> >>>> >>>> >>>> On 30.01.2017 10:42, Eno Thereska wrote: >>>>> Hi there, >>>>> >>>>> The inconsistency will be resolved, whether with materialize or >>>>> overloaded methods. >>>>> >>>>> With the discussion on the DSL & stores I feel we've gone in a >>>>> slightly different tangent, which is worth discussing nonetheless. >>>>> We have entered into an argument around the scope of the DSL. The >>>>> DSL has been designed primarily for processing. The DSL does not >>>>> dictate ways to access state stores or what hind of queries to >>>>> perform on them. Hence, I see the mechanism for accessing storage as >>>>> decoupled from the DSL. >>>>> >>>>> We could think of ways to get store handles from part of the DSL, >>>>> like the KTable abstraction. However, subsequent queries will be >>>>> store-dependent and not rely on the DSL, hence I'm not sure we get >>>>> any grand-convergence DSL-Store here. So I am arguing that the >>>>> current way of getting a handle on state stores is fine. >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>>> On 30 Jan 2017, at 03:56, Guozhang Wang <wangg...@gmail.com> wrote: >>>>>> >>>>>> Thinking loud here about the API options (materialize v.s. overloaded >>>>>> functions) and its impact on IQ: >>>>>> >>>>>> 1. The first issue of the current DSL is that, there is >>>>>> inconsistency upon >>>>>> whether / how KTables should be materialized: >>>>>> >>>>>> a) in many cases the library HAS TO materialize KTables no >>>>>> matter what, >>>>>> e.g. KStream / KTable aggregation resulted KTables, and hence we >>>>>> enforce >>>>>> users to provide store names and throw RTE if it is null; >>>>>> b) in some other cases, the KTable can be materialized or not; for >>>>>> example in KStreamBuilder.table(), store names can be nullable and >>>>>> in which >>>>>> case the KTable would not be materialized; >>>>>> c) in some other cases, the KTable will never be materialized, for >>>>>> example KTable.filter() resulted KTables, and users have no options to >>>>>> enforce them to be materialized; >>>>>> d) this is related to a), where some KTables are required to be >>>>>> materialized, but we do not enforce users to provide a state store >>>>>> name, >>>>>> e.g. KTables involved in joins; a RTE will be thrown not >>>>>> immediately but >>>>>> later in this case. >>>>>> >>>>>> 2. The second issue is related to IQ, where state stores are >>>>>> accessed by >>>>>> their state stores; so only those KTable's that have user-specified >>>>>> state >>>>>> stores will be queryable. But because of 1) above, many stores may >>>>>> not be >>>>>> interested to users for IQ but they still need to provide a >>>>>> (dummy?) state >>>>>> store name for them; while on the other hand users cannot query >>>>>> some state >>>>>> stores, e.g. the ones generated by KTable.filter() as there is no >>>>>> APIs for >>>>>> them to specify a state store name. >>>>>> >>>>>> 3. We are aware from user feedbacks that such backend details would be >>>>>> better be abstracted away from the DSL layer, where app developers >>>>>> should >>>>>> just focus on processing logic, while state stores along with their >>>>>> changelogs etc would better be in a different mechanism; same >>>>>> arguments >>>>>> have been discussed for serdes / windowing triggers as well. For >>>>>> serdes >>>>>> specifically, we had a very long discussion about it and concluded >>>>>> that, at >>>>>> least in Java7, we cannot completely abstract serde away in the >>>>>> DSL, so we >>>>>> choose the other extreme to enforce users to be completely aware of >>>>>> the >>>>>> serde requirements when some KTables may need to be materialized vis >>>>>> overloaded API functions. While for the state store names, I feel >>>>>> it is a >>>>>> different argument than serdes (details below). >>>>>> >>>>>> >>>>>> So to me, for either materialize() v.s. overloaded functions >>>>>> directions, >>>>>> the first thing I'd like to resolve is the inconsistency issue >>>>>> mentioned >>>>>> above. So in either case: KTable materialization will not be affect >>>>>> by user >>>>>> providing state store name or not, but will only be decided by the >>>>>> library >>>>>> when it is necessary. More specifically, only join operator and >>>>>> builder.table() resulted KTables are not always materialized, but >>>>>> are still >>>>>> likely to be materialized lazily (e.g. when participated in a join >>>>>> operator). >>>>>> >>>>>> >>>>>> For overloaded functions that would mean: >>>>>> >>>>>> a) we have an overloaded function for ALL operators that could >>>>>> result >>>>>> in a KTable, and allow it to be null (i.e. for the function without >>>>>> this >>>>>> param it is null by default); >>>>>> b) null-state-store-name do not indicate that a KTable would >>>>>> not be >>>>>> materialized, but that it will not be used for IQ at all (internal >>>>>> state >>>>>> store names will be generated when necessary). >>>>>> >>>>>> >>>>>> For materialize() that would mean: >>>>>> >>>>>> a) we will remove state store names from ALL operators that could >>>>>> result in a KTable. >>>>>> b) KTables that not calling materialized do not indicate that a >>>>>> KTable >>>>>> would not be materialized, but that it will not be used for IQ at all >>>>>> (internal state store names will be generated when necessary). >>>>>> >>>>>> >>>>>> Again, in either ways the API itself does not "hint" about anything >>>>>> for >>>>>> materializing a KTable or not at all; it is still purely determined >>>>>> by the >>>>>> library when parsing the DSL for now. >>>>>> >>>>>> Following these thoughts, I feel that 1) we should probably change >>>>>> the name >>>>>> "materialize" since it may be misleading to users as what actually >>>>>> happened >>>>>> behind the scene, to e.g. Damian suggested "queryableStore(String >>>>>> storeName)", >>>>>> which returns a QueryableStateStore, and can replace the >>>>>> `KafkaStreams.store` function; 2) comparing those two options >>>>>> assuming we >>>>>> get rid of the misleading function name, I personally favor not >>>>>> adding more >>>>>> overloading functions as it keeps the API simpler. >>>>>> >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak >>>>>> <jan.filip...@trivago.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> thanks for your mail, felt like this can clarify some things! The >>>>>>> thread >>>>>>> unfortunately split but as all branches close in on what my >>>>>>> suggestion was >>>>>>> about Ill pick this to continue >>>>>>> >>>>>>> Of course only the table the user wants to query would be >>>>>>> materialized. >>>>>>> (retrieving the queryhandle implies materialisation). So In the >>>>>>> example of >>>>>>> KTable::filter if you call >>>>>>> getIQHandle on both tables only the one source that is there would >>>>>>> materialize and the QueryHandleabstraction would make sure it gets >>>>>>> mapped >>>>>>> and filtered and what not uppon read as usual. >>>>>>> >>>>>>> Of Course the Object you would retrieve would maybe only wrap the >>>>>>> storeName / table unique identifier and a way to access the streams >>>>>>> instance and then basically uses the same mechanism that is >>>>>>> currently used. >>>>>>> From my point of view this is the least confusing way for DSL >>>>>>> users. If >>>>>>> its to tricky to get a hand on the streams instance one could ask >>>>>>> the user >>>>>>> to pass it in before executing queries, therefore making sure the >>>>>>> streams >>>>>>> instance has been build. >>>>>>> >>>>>>> The effort to implement this is indeed some orders of magnitude >>>>>>> higher >>>>>>> than the overloaded materialized call. As long as I could help >>>>>>> getting a >>>>>>> different view I am happy. >>>>>>> >>>>>>> Best Jan >>>>>>> >>>>>>> >>>>>>> On 28.01.2017 09:36, Eno Thereska wrote: >>>>>>> >>>>>>>> Hi Jan, >>>>>>>> >>>>>>>> I understand your concern. One implication of not passing any >>>>>>>> store name >>>>>>>> and just getting an IQ handle is that all KTables would need to be >>>>>>>> materialised. Currently the store name (or proposed >>>>>>>> .materialize() call) >>>>>>>> act as hints on whether to materialise the KTable or not. >>>>>>>> Materialising >>>>>>>> every KTable can be expensive, although there are some tricks one >>>>>>>> can play, >>>>>>>> e.g., have a virtual store rather than one backed by a Kafka topic. >>>>>>>> >>>>>>>> However, even with the above, after getting an IQ handle, the >>>>>>>> user would >>>>>>>> still need to use IQ APIs to query the state. As such, we would >>>>>>>> still >>>>>>>> continue to be outside the original DSL so this wouldn't address >>>>>>>> your >>>>>>>> original concern. >>>>>>>> >>>>>>>> So I read this suggestion as simplifying the APIs by removing the >>>>>>>> store >>>>>>>> name, at the cost of having to materialise every KTable. It's >>>>>>>> definitely an >>>>>>>> option we'll consider as part of this KIP. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Eno >>>>>>>> >>>>>>>> >>>>>>>> On 28 Jan 2017, at 06:49, Jan Filipiak <jan.filip...@trivago.com> >>>>>>>> wrote: >>>>>>>>> Hi Exactly >>>>>>>>> >>>>>>>>> I know it works from the Processor API, but my suggestion would >>>>>>>>> prevent >>>>>>>>> DSL users dealing with storenames what so ever. >>>>>>>>> >>>>>>>>> In general I am pro switching between DSL and Processor API >>>>>>>>> easily. (In >>>>>>>>> my Stream applications I do this a lot with reflection and >>>>>>>>> instanciating >>>>>>>>> KTableImpl) Concerning this KIP all I say is that there should >>>>>>>>> be a DSL >>>>>>>>> concept of "I want to expose this __KTable__. This can be a >>>>>>>>> Method like >>>>>>>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the table >>>>>>>>> would know >>>>>>>>> to materialize, and the user had a reference to the "store and the >>>>>>>>> distributed query mechanism by the Interactive Query Handle" >>>>>>>>> under the hood >>>>>>>>> it can use the same mechanism as the PIP people again. >>>>>>>>> >>>>>>>>> I hope you see my point J >>>>>>>>> >>>>>>>>> Best Jan >>>>>>>>> >>>>>>>>> >>>>>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 27.01.2017 21:59, Matthias J. Sax wrote: >>>>>>>>> >>>>>>>>>> Jan, >>>>>>>>>> >>>>>>>>>> the IQ feature is not limited to Streams DSL but can also be >>>>>>>>>> used for >>>>>>>>>> Stores used in PAPI. Thus, we need a mechanism that does work >>>>>>>>>> for PAPI >>>>>>>>>> and DSL. >>>>>>>>>> >>>>>>>>>> Nevertheless I see your point and I think we could provide a >>>>>>>>>> better API >>>>>>>>>> for KTable stores including the discovery of remote shards of >>>>>>>>>> the same >>>>>>>>>> KTable. >>>>>>>>>> >>>>>>>>>> @Michael: Yes, right now we do have a lot of overloads and I am >>>>>>>>>> not a >>>>>>>>>> big fan of those -- I would rather prefer a builder pattern. >>>>>>>>>> But that >>>>>>>>>> might be a different discussion (nevertheless, if we would aim >>>>>>>>>> for a API >>>>>>>>>> rework, we should get the changes with regard to stores right >>>>>>>>>> from the >>>>>>>>>> beginning on, in order to avoid a redesign later on.) >>>>>>>>>> >>>>>>>>>> something like: >>>>>>>>>> >>>>>>>>>> stream.groupyByKey() >>>>>>>>>> .window(TimeWindow.of(5000)) >>>>>>>>>> .aggregate(...) >>>>>>>>>> .withAggValueSerde(new CustomTypeSerde()) >>>>>>>>>> .withStoreName("storeName); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> (This would also reduce JavaDoc redundancy -- maybe a personal >>>>>>>>>> pain >>>>>>>>>> point right now :)) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote: >>>>>>>>>> >>>>>>>>>>> Yeah, >>>>>>>>>>> >>>>>>>>>>> Maybe my bad that I refuse to look into IQ as i don't find them >>>>>>>>>>> anywhere >>>>>>>>>>> close to being interesting. The Problem IMO is that people >>>>>>>>>>> need to know >>>>>>>>>>> the Store name), so we are working on different levels to >>>>>>>>>>> achieve a >>>>>>>>>>> single goal. >>>>>>>>>>> >>>>>>>>>>> What is your peoples opinion on having a method on KTABLE that >>>>>>>>>>> returns >>>>>>>>>>> them something like a Keyvalue store. There is of course >>>>>>>>>>> problems like >>>>>>>>>>> "it cant be used before the streamthreads are going and >>>>>>>>>>> groupmembership >>>>>>>>>>> is established..." but the benefit would be that for the user >>>>>>>>>>> there is >>>>>>>>>>> a >>>>>>>>>>> consistent way of saying "Hey I need it materialized as >>>>>>>>>>> querries gonna >>>>>>>>>>> be comming" + already get a Thing that he can execute the >>>>>>>>>>> querries on >>>>>>>>>>> in >>>>>>>>>>> 1 step. >>>>>>>>>>> What I think is unintuitive here is you need to say >>>>>>>>>>> materialize on this >>>>>>>>>>> Ktable and then you go somewhere else and find its store name >>>>>>>>>>> and then >>>>>>>>>>> you go to the kafkastreams instance and ask for the store with >>>>>>>>>>> this >>>>>>>>>>> name. >>>>>>>>>>> >>>>>>>>>>> So one could the user help to stay in DSL land and therefore >>>>>>>>>>> maybe >>>>>>>>>>> confuse him less. >>>>>>>>>>> >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 27.01.2017 16:51, Damian Guy wrote: >>>>>>>>>>> >>>>>>>>>>>> I think Jan is saying that they don't always need to be >>>>>>>>>>>> materialized, >>>>>>>>>>>> i.e., >>>>>>>>>>>> filter just needs to apply the ValueGetter, it doesn't need yet >>>>>>>>>>>> another >>>>>>>>>>>> physical state store. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll <mich...@confluent.io> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Like Damian, and for the same reasons, I am more in favor of >>>>>>>>>>>>> overloading >>>>>>>>>>>>> methods rather than introducing `materialize()`. >>>>>>>>>>>>> FWIW, we already have a similar API setup for e.g. >>>>>>>>>>>>> `KTable#through(topicName, stateStoreName)`. >>>>>>>>>>>>> >>>>>>>>>>>>> A related but slightly different question is what e.g. Jan >>>>>>>>>>>>> Filipiak >>>>>>>>>>>>> mentioned earlier in this thread: >>>>>>>>>>>>> I think we need to explain more clearly why KIP-114 doesn't >>>>>>>>>>>>> propose >>>>>>>>>>>>> the >>>>>>>>>>>>> seemingly simpler solution of always materializing tables/state >>>>>>>>>>>>> stores. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak < >>>>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? If >>>>>>>>>>>>>> you uses >>>>>>>>>>>>>> the >>>>>>>>>>>>>> ValueGetter of Filter it will apply the filter and should be >>>>>>>>>>>>>> completely >>>>>>>>>>>>>> transparent as to if another processor or IQ is accessing >>>>>>>>>>>>>> it? How >>>>>>>>>>>>>> can >>>>>>>>>>>>>> >>>>>>>>>>>>> this >>>>>>>>>>>>> >>>>>>>>>>>>>> new method help? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I cannot see the reason for the additional materialize >>>>>>>>>>>>>> method being >>>>>>>>>>>>>> required! Hence I suggest leave it alone. >>>>>>>>>>>>>> regarding removing the others I dont have strong opinions >>>>>>>>>>>>>> and it >>>>>>>>>>>>>> seems to >>>>>>>>>>>>>> be unrelated. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Forwarding this thread to the users list too in case people >>>>>>>>>>>>>> would >>>>>>>>>>>>>>> like >>>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> comment. It is also on the dev list. >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Begin forwarded message: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io> >>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and >>>>>>>>>>>>>>>> improved >>>>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT >>>>>>>>>>>>>>>> To: d...@kafka.apache.org >>>>>>>>>>>>>>>> Reply-To: d...@kafka.apache.org >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That not what I meant by "huge impact". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I refer to the actions related to materialize a KTable: >>>>>>>>>>>>>>>> creating a >>>>>>>>>>>>>>>> RocksDB store and a changelog topic -- users should be >>>>>>>>>>>>>>>> aware about >>>>>>>>>>>>>>>> runtime implication and this is better expressed by an >>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>> call, rather than implicitly triggered by using a different >>>>>>>>>>>>>>>> overload of >>>>>>>>>>>>>>>> a method. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think your definition of a huge impact and mine are rather >>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>> ;-P >>>>>>>>>>>>>>>>> Overloading a few methods is not really a huge impact >>>>>>>>>>>>>>>>> IMO. It is >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> also a >>>>>>>>>>>>>> sacrifice worth making for readability, usability of the API. >>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax < >>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I understand your argument, but do not agree with it. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Your first version (even if the "flow" is not as nice) >>>>>>>>>>>>>>>>>> is more >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>> than the second version. Adding a stateStoreName parameter >>>>>>>>>>>>>> is quite >>>>>>>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer the >>>>>>>>>>>>>>>>>> rather more >>>>>>>>>>>>>>>>>> verbose >>>>>>>>>>>>>>>>>> but explicit version. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts the >>>>>>>>>>>>>>>>>> flow, >>>>>>>>>>>>>>>>>>> i.e, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> table.mapValue(..).materialize().join(..).materialize() >>>>>>>>>>>>>>>>>>> compared to: >>>>>>>>>>>>>>>>>>> table.mapValues(..).join(..) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I know which one i prefer. >>>>>>>>>>>>>>>>>>> My preference is stil to provide overloaded methods where >>>>>>>>>>>>>>>>>>> people can >>>>>>>>>>>>>>>>>>> specify the store names if they want, otherwise we just >>>>>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax >>>>>>>>>>>>>>>>>>> <matth...@confluent.io >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing store >>>>>>>>>>>>>>>>>>>> name from >>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>> methods and generate internal names (however, I would >>>>>>>>>>>>>>>>>>>> do this >>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>> overloads). Furthermore, I would not force users to call >>>>>>>>>>>>>>>>>>>> .materialize() >>>>>>>>>>>>>>>>>>>> if they want to query a store, but add one more method >>>>>>>>>>>>>>>>>>>> .stateStoreName() >>>>>>>>>>>>>>>>>>>> that returns the store name if the KTable is >>>>>>>>>>>>>>>>>>>> materialized. >>>>>>>>>>>>>>>>>>>> Thus, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>> .materialize() must not necessarily have a parameter storeName >>>>>>>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> should have some overloads here). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I would also not allow to provide a null store name (to >>>>>>>>>>>>>>>>>>>> indicate no >>>>>>>>>>>>>>>>>>>> materialization if not necessary) but throw an >>>>>>>>>>>>>>>>>>>> exception. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This yields some simplification (see below). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about >>>>>>>>>>>>>>>>>>>> KStream#toTable() >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on >>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore >>>>>>>>>>>>>>>>>>>>> (providing >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> different), throw an Exception? >>>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there is no >>>>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>> worry >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>> a second materialization and also no exception must be >>>>>>>>>>>>>>>>>>>> throws. A >>>>>>>>>>>>>>>>>>>> call to >>>>>>>>>>>>>>>>>>>> .materialize() basically sets a "materialized flag" (ie, >>>>>>>>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>>>>>>>> operation) and sets a new name. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 4) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use >>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and >>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#table()`, for >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> don't care about the "K" prefix. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Eno's reply: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it >>>>>>>>>>>>>>>>>>>> absolutely >>>>>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> we are converting it to. >>>>>>>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder >>>>>>>>>>>>>>>>>>>> methods >>>>>>>>>>>>>>>>>>>>> (but >>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> this KIP). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I would keep #toStream(). (see below) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 5) We should not remove any methods but only >>>>>>>>>>>>>>>>>>>> deprecate them. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> A general note: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I do not understand your comments "Rejected >>>>>>>>>>>>>>>>>>>> Alternatives". You >>>>>>>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>>>>> "Have >>>>>>>>>>>>>>>>>>>> the KTable be the materialized view" was rejected. >>>>>>>>>>>>>>>>>>>> But your >>>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of >>>>>>>>>>>>>>>>>>>> KTable is >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> secondary >>>>>>>>>>>>>> after those changes and the "view" abstraction is what a >>>>>>>>>>>>>>>>>>>> KTable is. >>>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>> just to be clear, I like this a lot: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - it aligns with the name KTable >>>>>>>>>>>>>>>>>>>> - is aligns with stream-table-duality >>>>>>>>>>>>>>>>>>>> - it aligns with IQ >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction" (as >>>>>>>>>>>>>>>>>>>> materialization is >>>>>>>>>>>>>>>>>>>> optional). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta comments >>>>>>>>>>>>>>>>>>>> and a few >>>>>>>>>>>>>>>>>>>>> detailed >>>>>>>>>>>>>>>>>>>>> comments: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1. I like the materialize() function in general, but >>>>>>>>>>>>>>>>>>>>> I would >>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>> how other KTable functions should be updated >>>>>>>>>>>>>>>>>>> accordingly. For >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>> 1) >>>>>>>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name >>>>>>>>>>>>>>>>>>> parameter, and >>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>> always materialize the KTable unless its state store name >>>>>>>>>>>>>> is set >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>> null; >>>>>>>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be >>>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> hence >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> also have a state store name; 3) KTable.join requires the >>>>>>>>>>>>>>>>>>> joining >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> be materialized. And today we do not actually have a >>>>>>>>>>>>>>>>>>>>> mechanism to >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>>>>>>> that, but will only throw an exception at runtime if >>>>>>>>>>>>>>>>>>> it is not >>>>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE will be >>>>>>>>>>>>>>>>>>>>> thrown). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off the >>>>>>>>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>> let's >>>>>>>>>>>>>>>>>>> remove all the state store params in other KTable >>>>>>>>>>>>>>>>>>> functions, >>>>>>>>>>>>>>>>>>> and if >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable >>>>>>>>>>>>>>>>>>>> resulted >>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> KXX.agg) >>>>>>>>>>>>>>>>>>> and users do not call materialize(), then we treat it >>>>>>>>>>>>>>>>>>> as "users >>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>> interested in querying it at all" and hence use an >>>>>>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> generated >>>>>>>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is >>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>> store is not exposed to users. And if users call >>>>>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> afterwards >>>>>>>>>>>>>>>>>>> but we have already decided to materialize it, we can >>>>>>>>>>>>>>>>>>> replace >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> name with the user's provided names. Then from a user's >>>>>>>>>>>>>>>>>>>> point-view, >>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>> ever want to query a KTable, they have to call >>>>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>> given >>>>>>>>>>>>>>>>>>> state store name. This approach has one awkwardness >>>>>>>>>>>>>>>>>>> though, >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> serdes >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> state store names param are not separated and could be >>>>>>>>>>>>>>>>>>>>> overlapped >>>>>>>>>>>>>>>>>>>>> (see >>>>>>>>>>>>>>>>>>>>> detailed comment #2 below). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 2. This step does not need to be included in this >>>>>>>>>>>>>>>>>>>>> KIP, but >>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> as a >>>>>>>>>>>>>> reference / future work: as we have discussed before, we may >>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>> materialize KTable.join resulted KTables as well in the >>>>>>>>>>>>>>>>>>>>> future. If >>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>> that, then: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always materialized; >>>>>>>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to >>>>>>>>>>>>>>>>>>>>> always be >>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>> (otherwise we would not know the old value); >>>>>>>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always >>>>>>>>>>>>>>>>>>>>> materialized, and >>>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> joining KTables to always be materialized. >>>>>>>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables >>>>>>>>>>>>>>>>>>>>> materialization >>>>>>>>>>>>>>>>>>>>> depend >>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>> parent's materialization; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> By recursive induction all KTables are actually always >>>>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> then the effect of the "materialize()" is just for >>>>>>>>>>>>>>>>>>> specifying >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>> store names. In this scenario, we do not need to send >>>>>>>>>>>>>>>>>>>>> Change<V> in >>>>>>>>>>>>>>>>>>>>> repartition topics within joins any more, but only for >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> repartitions >>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>> within aggregations. Instead, we can just send a >>>>>>>>>>>>>>>>>>>> "tombstone" >>>>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> old value and we do not need to calculate joins twice >>>>>>>>>>>>>>>>>>> (one more >>>>>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> old value is received). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a >>>>>>>>>>>>>>>>>>>>> "KStream#toTable()" >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> function >>>>>>>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where the >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>> value >>>>>>>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of use >>>>>>>>>>>>>>>>>>>>> cases of >>>>>>>>>>>>>>>>>>>>> this, >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> example, users want to read a changelog topic, apply >>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>> filters, >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>> materialize it into a KTable with state stores without >>>>>>>>>>>>>>>>>>>>> creating >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> duplicated >>>>>>>>>>>>>>>>>>>> changelog topics. With materialize() and toTable I'd >>>>>>>>>>>>>>>>>>>> imagine >>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>> specify sth. like: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>>>>> KStream stream = builder.stream("topic1").filter(..); >>>>>>>>>>>>>>>>>>>>> KTable table = stream.toTable(..); >>>>>>>>>>>>>>>>>>>>> table.materialize("state1"); >>>>>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> And the library in this case could set store >>>>>>>>>>>>>>>>>>>>> "state1" 's >>>>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly while >>>>>>>>>>>>>>>>>>>>> (re-)storing >>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>> state by reading from this topic, instead of creating a >>>>>>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a >>>>>>>>>>>>>>>>>>> semi-duplicate >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> "topic1". >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Detailed: >>>>>>>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; >>>>>>>>>>>>>>>>>>>>> actually I was >>>>>>>>>>>>>>>>>>>>> thinking >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>> renaming to "#toChangeLog" but after thinking a bit >>>>>>>>>>>>>>>>>>>> more I >>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> #toStream >>>>>>>>>>>>>>>>>>>> is still better, and we can just mention in the >>>>>>>>>>>>>>>>>>>> javaDoc that >>>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>> transforming its underlying changelog stream to a >>>>>>>>>>>>>>>>>>>>> normal >>>>>>>>>>>>>>>>>>>>> stream. >>>>>>>>>>>>>>>>>>>>> 2. As Damian mentioned, there are a few scenarios >>>>>>>>>>>>>>>>>>>>> where the >>>>>>>>>>>>>>>>>>>>> serdes >>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>> already specified in a previous operation whereas it >>>>>>>>>>>>>>>>>>>>> is not >>>>>>>>>>>>>>>>>>>>> known >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>> calling materialize, for example: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s. >>>>>>>>>>>>>>>>>>>>> table.mapValues(/*no >>>>>>>>>>>>>>>>>>> serde specified*/).materialize(serde). We need to specify >>>>>>>>>>>>>>>>>>> what are >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> handling logic here. >>>>>>>>>>>>>>>>>>>>> 3. We can remove "KTable#to" call as well, and >>>>>>>>>>>>>>>>>>>>> enforce users >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> call " >>>>>>>>>>>>>>>>>>>>> KTable.toStream.to" to be more clear. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska < >>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it >>>>>>>>>>>>>>>>>>>>> absolutely >>>>>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> are converting it to. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I'd say we should probably change the >>>>>>>>>>>>>>>>>>>>>> KStreamBuilder methods >>>>>>>>>>>>>>>>>>>>>> (but >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> this KIP). >>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll < >>>>>>>>>>>>>>>>>>>>>> mich...@confluent.io> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use >>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and >>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#table()`, for >>>>>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> don't care about the "K" prefix. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska < >>>>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> Thanks Damian, answers inline: >>>>>>>>>>>>>>>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy < >>>>>>>>>>>>>>>>>>>>>>>> damian....@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Eno, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. Some comments: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 1. I'd probably rename materialized to materialize. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ok. >>>>>>>>>>>>>>>>>>>>>>>> 2. I don't think the addition of the new Log >>>>>>>>>>>>>>>>>>>>>>>> compaction >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> mechanism >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> necessary for this KIP, i.e, the KIP is useful >>>>>>>>>>>>>>>>>>>>>>> without it. >>>>>>>>>>>>>>>>>>> Maybe >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> should be a different KIP? >>>>>>>>>>>>>>>>>>>>>> Agreed, already removed. Will do a separate KIP for >>>>>>>>>>>>>>>>>>>>>> that. >>>>>>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on >>>>>>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore >>>>>>>>>>>>>>>>>>>>>>> (providing >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> different), throw an Exception? >>>>>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>>>>>>>>>>>>>>>>> 4. Have you considered overloading the existing >>>>>>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> operations >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>> a state store name? So if a state store name is >>>>>>>>>>>>>>>>>>>> provided, >>>>>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> materialize >>>>>>>>>>>>>>>>>>>>>>>> a state store? This would be my preferred >>>>>>>>>>>>>>>>>>>>>>>> approach as i >>>>>>>>>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>> materialize is always a valid operation. >>>>>>>>>>>>>>>>>>>> Ok I can see your point. This will increase the KIP size >>>>>>>>>>>>>>>>>>>>>>>> since >>>>>>>>>>>>>>>>>>>>>>>> I'll >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>> to enumerate all overloaded methods, but it's not a >>>>>>>>>>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>>>>>>>>> 5. The materialize method will need ta value Serde >>>>>>>>>>>>>>>>>>>>>> as some >>>>>>>>>>>>>>>>>>>>>>>> operations, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> i.e., mapValues, join etc can change the value types >>>>>>>>>>>>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 >>>>>>>>>>>>>>>>>>>>>> - might >>>>>>>>>>>>>>>>>>>>>>>>> mean >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> always need to materialize the StateStore for >>>>>>>>>>>>>>>>>>>>>> KTable-KTable >>>>>>>>>>>>>>>>>>>>>>>>> joins. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> is the case, then the KTable Join operators will >>>>>>>>>>>>>>>>>>>>>> also need >>>>>>>>>>>>>>>>>>>>>>>> Serde >>>>>>>>>>>>>> information. >>>>>>>>>>>>>>>>>>>>>>>>> I'll update the KIP with the serdes. >>>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska < >>>>>>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>> We created "KIP-114: KTable materialization and >>>>>>>>>>>>>>>>>>>>>>>>>> improved >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> semantics" >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> solidify the KTable semantics in Kafka Streams: >>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics >>>>>>>>>>>>>>>>>>>>>>>>> Your feedback is appreciated. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>> >>>> >>> >>> >> >> >