Bumping this discussion as we need to re-vote before the KIP deadline. On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbej...@gmail.com> wrote:
> Hi All, > > While working on the implementation of KIP-479, some issues came to light > that the KIP as written won't work. I have updated the KIP with a solution > I believe will solve the original problem as well as address the > impediment to the initial approach. > > This update is a significant change, so please review the updated KIP > https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join > and > provide feedback. After we conclude the discussion, there will be a > re-vote. > > Thanks! > Bill > > On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Bill, thanks for your explanations. I'm on board with your decision >> too. >> >> >> Guozhang >> >> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbej...@gmail.com> wrote: >> >> > Thanks for the response, John. >> > >> > > If I can offer my thoughts, it seems better to just document on the >> > > Stream join javadoc for the Materialized parameter that it will not >> > > make the join result queriable. I'm not opposed to the queriable flag >> > > in general, but introducing it is a much larger consideration that has >> > > previously derailed this KIP discussion. In the interest of just >> > > closing the gap and keeping the API change small, it seems better to >> > > just go with documentation for now. >> > >> > I agree with your statement here. IMHO the most important goal of this >> KIP >> > is to not breaking existing users and gain some consistency of the API. >> > >> > I'll update the KIP accordingly. >> > >> > -Bill >> > >> > On Tue, Jul 16, 2019 at 11:55 AM John Roesler <j...@confluent.io> >> wrote: >> > >> > > Hi Bill, >> > > >> > > Thanks for driving this KIP toward a conclusion. I'm on board with >> > > your decision. >> > > >> > > You didn't mention whether you're still proposing to add the >> > > "queriable" flag to the Materialized config object, or just document >> > > that a Stream join is never queriable. Both options have come up >> > > earlier in the discussion, so it would be good to pin this down. >> > > >> > > If I can offer my thoughts, it seems better to just document on the >> > > Stream join javadoc for the Materialized parameter that it will not >> > > make the join result queriable. I'm not opposed to the queriable flag >> > > in general, but introducing it is a much larger consideration that has >> > > previously derailed this KIP discussion. In the interest of just >> > > closing the gap and keeping the API change small, it seems better to >> > > just go with documentation for now. >> > > >> > > Thanks again, >> > > -John >> > > >> > > On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <bbej...@gmail.com> >> wrote: >> > > > >> > > > Thanks all for the great discussion so far. >> > > > >> > > > Everyone has made excellent points, and I appreciate the detail >> > everyone >> > > > has put into their arguments. >> > > > >> > > > However, after carefully evaluating all the points made so far, >> > creating >> > > an >> > > > overload with Materialized is still my #1 option. >> > > > My reasoning for saying so is two-fold: >> > > > >> > > > 1. It's a small change, and IMHO since it's consistent with our >> > > current >> > > > API concerning state store usage, the cognitive load on users >> will >> > be >> > > > minimal. >> > > > 2. It achieves the most important goal of this KIP, namely to >> close >> > > the >> > > > gap of naming state stores independently of the join operator >> name. >> > > > >> > > > Additionally, I agree with the points made by Matthias earlier (I >> > realize >> > > > there is some overlap here). >> > > > >> > > > > - the main purpose of this KIP is to close the naming gap what we >> > > achieve >> > > > > - we can allow people to use the new in-memory store >> > > > > - we allow people to enable/disable caching >> > > > > - we unify the API >> > > > > - we decouple querying from naming >> > > > > - it's a small API change >> > > > >> > > > Although it's not a perfect solution, IMHO the positives of using >> > > > Materialize far outweigh the negatives, and from what we've >> discussed >> > so >> > > > far, anything we implement seems to involve an additional change >> down >> > the >> > > > road. >> > > > >> > > > If others are still strongly opposed to using Materialized, my other >> > > > preferences would be >> > > > >> > > > 1. Add a "withStoreName" to Joined. Although I agree with >> Guozhang >> > > that >> > > > having a parameter that only applies to one use-case would be >> > clumsy. >> > > > 2. Add a String overload for naming the store, but this would be >> my >> > > > least favorite option as IMHO this seems to be a step backward >> from >> > > why we >> > > > introduced configuration objects in the first place. >> > > > >> > > > Thanks, >> > > > Bill >> > > > >> > > > On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax < >> matth...@confluent.io >> > > >> > > > wrote: >> > > > >> > > > > Thanks for the KIP Bill! >> > > > > >> > > > > Great discussion to far. >> > > > > >> > > > > About John's idea about querying upstream stores and don't >> > materialize >> > > a >> > > > > store: I agree with Bill that this seems to be an orthogonal >> > question, >> > > > > and it might be better to treat it as an independent optimization >> and >> > > > > exclude from this KIP. >> > > > > >> > > > > > What should be the behavior if there is no store >> > > > > > configured (e.g., if Materialized with only serdes) and >> querying is >> > > > > > enabled? >> > > > > >> > > > > IMHO, this could be an error case. If one wants to query a store, >> > they >> > > > > need to provide a name -- if you don't know the name, how would >> you >> > > > > actually query the store (even if it would be possible to get the >> > name >> > > > > from the `TopologyDescription`, it seems clumsy). >> > > > > >> > > > > If we don't want to throw an error, materializing seems to be the >> > right >> > > > > option, to exclude "query optimization" from this KIP. I would be >> ok >> > > > > with this option, even if it's clumsy to get the name from >> > > > > `TopologyDescription`; hence, I would prefer to treat it as an >> error. >> > > > > >> > > > > > To get back to the current behavior, users would have to >> > > > > > add a "bytes store supplier" to the Materialized to indicate >> that, >> > > > > > yes, they really want a state store there. >> > > > > >> > > > > This sound like a quite subtle semantic difference on how to use >> the >> > > > > API. Might be hard to explain to users. I would prefer to not >> > > introduce it. >> > > > > >> > > > > >> > > > > >> > > > > About Guozhang's points: >> > > > > >> > > > > 1a) That is actually a good point. However, I believe we cannot >> get >> > > > > around this issue easily, and it seems ok to me, to expose the >> actual >> > > > > store type we are using. (More thoughts later.) >> > > > > >> > > > > 1b) I don't see an issue with allowing users to query all stores? >> > What >> > > > > is the rational behind it? What do we gain by not allowing it? >> > > > > >> > > > > 2) While I understand what you are saying, we also want/need to >> have >> > a >> > > > > way in the PAPI to allow users adding "internal/private" >> > non-queryable >> > > > > stores to a topology. That's possible via >> > > > > `Materialized#withQueryingDisabled()`. We could also update >> > > > > `Topology#addStateStore(StoreBuilder, boolean isQueryable, >> > String...)` >> > > > > to address this. Again, I agree with Bill that the current API is >> > built >> > > > > in a certain way, and if we want to change it, it should be a >> > separate >> > > > > KIP, as it seems to be an orthogonal concern. >> > > > > >> > > > > > Instead, we just restrict KIP-307 to NOT >> > > > > > use the Joined.name for state store names and always use >> internal >> > > names >> > > > > as >> > > > > > well, which admittedly indeed leaves a hole of not being able to >> > > cover >> > > > > all >> > > > > > internal names here >> > > > > >> > > > > I think it's important to close this gap. Naming entities seems >> to a >> > > > > binary feature: if there is a gap, the feature is more or less >> > useless, >> > > > > rendering KIP-307 void. >> > > > > >> > > > > >> > > > > >> > > > > I like John's detailed list of required features and what >> > > > > Materialized/WindowByteStoreSuppliers offers. My take is, that >> adding >> > > > > Materialized including the required run-time checks is the best >> > option >> > > > > we have, for the following reasons: >> > > > > >> > > > > - the main purpose of this KIP is to close the naming gap what we >> > > achieve >> > > > > - we can allow people to use the new in-memory store >> > > > > - we allow people to enable/disable caching >> > > > > - we unify the API >> > > > > - we decouple querying from naming >> > > > > - it's a small API change >> > > > > >> > > > > Adding an overload and only passing in a name, would address the >> main >> > > > > purpose of the KIP. However, it falls short on all the other >> > "goodies". >> > > > > As you mentioned, passing in `Materialized` might not be perfect >> and >> > > > > maybe we need to deprecate is at some point; but this is also true >> > for >> > > > > passing in just a name. >> > > > > >> > > > > I am also not convinced, that a `StreamJoinStore` would resolve >> all >> > the >> > > > > issues. In the end, as long as we are using a `WindowedStore` >> > > > > internally, we need to expose this "implemenation detail" to >> users to >> > > > > allow them to plug in a custom store. Adding `Materialized` seem >> to >> > be >> > > > > the best short-term fix from my point of view. >> > > > > >> > > > > >> > > > > -Matthias >> > > > > >> > > > > >> > > > > On 6/27/19 9:56 AM, Guozhang Wang wrote: >> > > > > > Hi John, >> > > > > > >> > > > > > I actually feels better about a new interface but I'm not sure >> if >> > we >> > > > > would >> > > > > > need the full configuration of store / log / cache, now or in >> the >> > > future >> > > > > > ever for stream-stream join. >> > > > > > >> > > > > > Right now I feel that 1) we want to improve our implementation >> of >> > > > > > stream-stream join, and potentially also allow users to >> customize >> > > this >> > > > > > implementation but with a more suitable interface than the >> current >> > > > > > WindowStore interface, how to do that is less clear and >> > > execution-wise >> > > > > it's >> > > > > > (arguably..) not urgent; 2) we want to close the last gap >> > > (Stream-stream >> > > > > > join) of allowing users to specify all internal names to help on >> > > backward >> > > > > > compatibility, which is urgent. >> > > > > > >> > > > > > Therefore if we want to unblock 2) from 1) in the near term, I >> feel >> > > > > > slightly inclined to just add overload functions that takes in a >> > > store >> > > > > name >> > > > > > for stream-stream joins only -- and admittedly, in the future >> this >> > > > > function >> > > > > > maybe deprecated -- i.e. if we have to do something that we "may >> > > regret" >> > > > > in >> > > > > > the future, I'd like to pick the least intrusive option. >> > > > > > >> > > > > > About `Joined#withStoreName`: since the Joined class itself is >> also >> > > used >> > > > > in >> > > > > > other join types, I feel less comfortable to have a >> > > > > `Joined#withStoreName` >> > > > > > which is only going to be used by stream-stream join. Or maybe I >> > miss >> > > > > > something here about the "latter" case that you are referring >> to? >> > > > > > >> > > > > > >> > > > > > >> > > > > > Guozhang >> > > > > > >> > > > > > On Mon, Jun 24, 2019 at 12:16 PM John Roesler < >> j...@confluent.io> >> > > wrote: >> > > > > > >> > > > > >> Thanks Guozhang, >> > > > > >> >> > > > > >> Yep. Maybe we can consider just exactly what the join needs: >> > > > > >> >> > > > > >>> the WindowStore<Bytes, byte[]> itself is fine, if overly >> broad, >> > > > > >>> since the only two methods we need are `window.put(key, value, >> > > > > >>> context().timestamp())` and `WindowStoreIterator<V2> iter = >> > > > > >>> window.fetch(key, timeFrom, timeTo)`. >> > > > > >> >> > > > > >> One "middle ground" would be to extract _this_ into a new store >> > > > > >> interface, which only supports these API calls, like >> > > > > >> StreamJoinStore<K, V>. This would give us the latitude we need >> to >> > > > > >> efficiently support the exact operation without concerning >> > ourselves >> > > > > >> with all the other things a WindowStore can do (which are >> > > unreachable >> > > > > >> for the join use case). It would also let us drop "store >> > duplicates" >> > > > > >> from the main WindowStore interface, since it only exists to >> > support >> > > > > >> the join use case. >> > > > > >> >> > > > > >> If we were to add a new StreamJoinStore interface, then it'd be >> > > > > >> straightforward how we could add also >> > > > > >> `Materialized.as(StreamJoinBytesStoreSupplier)` and use >> > > Materialized, >> > > > > >> or alternatively add the ability to set the bytes store on >> Joined. >> > > > > >> >> > > > > >> Personally, I'm kind of leaning toward the latter (and also >> doing >> > > > > >> `Joined#withStoreName`), since adding the new interface to >> > > > > >> Materialized then also pollutes the interface for its _actual_ >> use >> > > > > >> case of materializing a table view. Of course, to solve the >> > > immediate >> > > > > >> problem, all we need is the store name, but we might feel >> better >> > > about >> > > > > >> adding the store name to Joined if we _also_ feel like in the >> > > future, >> > > > > >> we would add store/log/cache configuration to Joined as well. >> > > > > >> >> > > > > >> -John >> > > > > >> >> > > > > >> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang < >> > wangg...@gmail.com> >> > > > > wrote: >> > > > > >>> >> > > > > >>> Hello John, >> > > > > >>> >> > > > > >>> My main concern is exactly the first point at the bottom of >> your >> > > > > analysis >> > > > > >>> here: "* configure the bytes store". I'm not sure if using a >> > window >> > > > > bytes >> > > > > >>> store would be ideal for stream-stream windowed join; e.g. we >> > could >> > > > > >>> consider two dimensional list sorted by timestamps and then by >> > > keys to >> > > > > do >> > > > > >>> the join, whereas a windowed bytes store is basically sorted >> by >> > key >> > > > > >> first, >> > > > > >>> then by timestamp. If we expose the Materialized to let user >> pass >> > > in a >> > > > > >>> windowed bytes store, then we would need to change that if we >> > want >> > > to >> > > > > >>> replace it with a different implementation interface. >> > > > > >>> >> > > > > >>> >> > > > > >>> Guozhang >> > > > > >>> >> > > > > >>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler < >> j...@confluent.io> >> > > > > wrote: >> > > > > >>> >> > > > > >>>> Hey Guozhang and Bill, >> > > > > >>>> >> > > > > >>>> For what it's worth, I agree with you both! >> > > > > >>>> >> > > > > >>>> I think it might help the discussion to look concretely at >> what >> > > > > >>>> Materialized does: >> > > > > >>>> * set a WindowBytesStoreSupplier >> > > > > >>>> * set a name >> > > > > >>>> * set the key/value serdes >> > > > > >>>> * disable/enable/configure change-logging >> > > > > >>>> * disable/enable caching >> > > > > >>>> * configure retention >> > > > > >>>> >> > > > > >>>> Further, looking into the WindowBytesStoreSupplier, the >> > interface >> > > lets >> > > > > >> you: >> > > > > >>>> * get the segment interval >> > > > > >>>> * get the window size >> > > > > >>>> * get whether "duplicates" are enabled >> > > > > >>>> * get the retention period >> > > > > >>>> * (obviously) get a WindowStore<Bytes, byte[]> >> > > > > >>>> >> > > > > >>>> We know that Materialized isn't exactly what we need for >> stream >> > > joins, >> > > > > >>>> but we can see how close Materialized is to what we need. If >> it >> > is >> > > > > >>>> close, maybe we can use it and document the gaps, and if it >> is >> > not >> > > > > >>>> close, then maybe we should just add what we need to Joined. >> > > > > >>>> Stream Join's requirements for its stores: >> > > > > >>>> * a multimap store (i.e., it keeps duplicates) for storing >> > general >> > > > > >>>> (not windowed) keyed records associated with their insertion >> > > time, and >> > > > > >>>> allows efficient time-bounded lookups and also efficient >> purges >> > > of old >> > > > > >>>> data. >> > > > > >>>> ** Note, a properly configured WindowBytesStoreSupplier >> > satisfies >> > > this >> > > > > >>>> requirement, and the interface supports the queries we need >> to >> > > verify >> > > > > >>>> the configuration at run-time >> > > > > >>>> * set a name for the store >> > > > > >>>> * do _not_ set the serdes (they are already set in Joined) >> > > > > >>>> * logging could be configurable (set to enabled now) >> > > > > >>>> * caching could be configurable (set to enabled now) >> > > > > >>>> * do _not_ configure retention (determined by JoinWindows) >> > > > > >>>> >> > > > > >>>> So, out of six capabilities for Materialized, there are two >> we >> > > don't >> > > > > >>>> want (serdes and retention). These would become run-time >> checks >> > > if we >> > > > > >>>> use it. >> > > > > >>>> >> > > > > >>>> A third questionable capability is to provide a >> > > > > >>>> WindowBytesStoreSupplier. Looking at whether the >> > > > > >>>> WindowBytesStoreSupplier is the right interface for Stream >> Join: >> > > > > >>>> * configuring segment interval is fine >> > > > > >>>> * should _not_ configure window size (it's determined by >> > > JoinWindows) >> > > > > >>>> * duplicates _must_ be enabled >> > > > > >>>> * retention should be _at least_ windowSize + gracePeriod, >> but >> > > note >> > > > > >>>> that (unlike for Table window stores) there is no utility in >> > > having a >> > > > > >>>> longer retention time. >> > > > > >>>> * the WindowStore<Bytes, byte[]> itself is fine, if overly >> > broad, >> > > > > >>>> since the only two methods we need are `window.put(key, >> value, >> > > > > >>>> context().timestamp())` and `WindowStoreIterator<V2> iter = >> > > > > >>>> window.fetch(key, timeFrom, timeTo)`. >> > > > > >>>> >> > > > > >>>> Thus, flattening out the overlap for WindowBytesStoreSupplier >> > > onto the >> > > > > >>>> overlap for Materialized, we have 9 capabilities total (note >> > > retention >> > > > > >>>> is duplicated), we have 4 that we don't want: >> > > > > >>>> * do _not_ set the serdes (they are already set in Joined) >> > > > > >>>> * do _not_ configure retention (determined by JoinWindows) >> > > > > >>>> * should _not_ configure window size (it's determined by >> > > JoinWindows) >> > > > > >>>> * duplicates _must_ be enabled >> > > > > >>>> >> > > > > >>>> These gaps would have to be covered with run-time checks if >> we >> > > re-use >> > > > > >>>> Materialized and WindowStoreBytesStoreSupplier both. Maybe >> this >> > > sounds >> > > > > >>>> bad, but consider the other side, that we get 5 new >> capabilities >> > > we >> > > > > >>>> don't require, but are still pretty nice: >> > > > > >>>> * configure the bytes store >> > > > > >>>> * set a name for the store >> > > > > >>>> * configure caching >> > > > > >>>> * configure logging >> > > > > >>>> * configure segment interval >> > > > > >>>> >> > > > > >>>> Not sure where this nets us out, but it's food for thought. >> > > > > >>>> -John >> > > > > >>>> >> > > > > >>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang < >> > wangg...@gmail.com >> > > > >> > > > > >> wrote: >> > > > > >>>>> >> > > > > >>>>> Hi Bill, >> > > > > >>>>> >> > > > > >>>>> I think by giving a Materialized param into stream-stream >> join, >> > > it's >> > > > > >> okay >> > > > > >>>>> (though still ideal) to say "we still would not expose the >> > store >> > > for >> > > > > >>>>> queries", but it would sound a bit awkward to say "we would >> > also >> > > > > >> ignore >> > > > > >>>>> whatever the passed in store supplier but just use our >> default >> > > ones" >> > > > > >> -- >> > > > > >>>>> again the concern is that, if in the future we'd want to >> change >> > > the >> > > > > >>>> default >> > > > > >>>>> implementation of join algorithm which no longer rely on a >> > window >> > > > > >> store >> > > > > >>>>> with deduping enabled, then we need to change this API >> again by >> > > > > >> changing >> > > > > >>>>> the store supplier type. >> > > > > >>>>> >> > > > > >>>>> If we do want to fill this hole for stream-stream join, I >> feel >> > > just >> > > > > >>>> adding >> > > > > >>>>> a String typed store-name would even be less >> future-intrusive >> > if >> > > we >> > > > > >>>> expect >> > > > > >>>>> this parameter to be modified later. >> > > > > >>>>> >> > > > > >>>>> Does that makes sense? >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> Guozhang >> > > > > >>>>> >> > > > > >>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck < >> > bbej...@gmail.com> >> > > > > >> wrote: >> > > > > >>>>> >> > > > > >>>>>> Thanks for the comments John and Guozhang, I'll address >> each >> > > one of >> > > > > >>>> your >> > > > > >>>>>> comments in turn. >> > > > > >>>>>> >> > > > > >>>>>> John, >> > > > > >>>>>> >> > > > > >>>>>>> I'm wondering about a missing quadrant from the truth >> table >> > > > > >> involving >> > > > > >>>>>>> whether a Materialized is stored or not and querying is >> > > > > >>>>>>> enabled/disabled... What should be the behavior if there >> is >> > no >> > > > > >> store >> > > > > >>>>>>> configured (e.g., if Materialized with only serdes) and >> > > querying >> > > > > >> is >> > > > > >>>>>> enabled? >> > > > > >>>>>> >> > > > > >>>>>>> It seems we have two choices: >> > > > > >>>>>>> 1. we can force creation of a state store in this case, so >> > the >> > > > > >> store >> > > > > >>>>>>> can be used to serve the queries >> > > > > >>>>>>> 2. we can provide just a queriable view, basically >> letting IQ >> > > > > >> query >> > > > > >>>>>>> into the "KTableValueGetter", which would transparently >> > > > > >> construct the >> > > > > >>>>>>> query response by applying the operator logic to the >> upstream >> > > > > >> state >> > > > > >>>> if >> > > > > >>>>>>> the operator state isn't already stored. >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>> I agree with your assertion about a missing quadrant from >> the >> > > truth >> > > > > >>>> table. >> > > > > >>>>>> Additionally, I too like the concept of a queriable view. >> > But I >> > > > > >> think >> > > > > >>>> that >> > > > > >>>>>> goes a bit beyond the scope of this KIP and would like to >> > pursue >> > > > > >> that >> > > > > >>>>>> feature as follow-on work. Also thinking about this KIP >> some >> > > > > >> more, I'm >> > > > > >>>>>> thinking of the changes to Materialized might be a reach as >> > > well. >> > > > > >>>>>> Separating the naming from a store and its queryable state >> > seems >> > > > > >> like a >> > > > > >>>>>> complex issue in and of itself and should be treated >> > > accordingly. >> > > > > >>>>>> >> > > > > >>>>>> So here's what I'm thinking now. We add Materialzied to >> Join, >> > > but >> > > > > >> for >> > > > > >>>> now, >> > > > > >>>>>> we internally disable querying. I know this breaks our >> > current >> > > > > >>>> semantic >> > > > > >>>>>> approach, but I think it's crucial that we do two things in >> > this >> > > > > >> KIP >> > > > > >>>>>> >> > > > > >>>>>> 1. Break the naming of the state stores from Joined to >> > > > > >>>> Materialized, so >> > > > > >>>>>> the naming of state stores follows our current pattern >> and >> > > > > >> enables >> > > > > >>>>>> upgrades >> > > > > >>>>>> from 2.3 to 2.4 >> > > > > >>>>>> 2. Offer the ability to configure the state stores of >> the >> > > join, >> > > > > >> even >> > > > > >>>>>> providing a different implementation (i.e. in-memory) if >> > > > > >> desired. >> > > > > >>>>>> >> > > > > >>>>>> With that in mind I'm considering changing the KIP to >> remove >> > the >> > > > > >>>> changes to >> > > > > >>>>>> Materialized, and we document very clearly that by >> providing a >> > > > > >>>> Materialized >> > > > > >>>>>> object with a name is only for naming the state store, >> hence >> > the >> > > > > >>>> changelog >> > > > > >>>>>> topics and any possible configurations of the store, but >> this >> > > store >> > > > > >>>> *will >> > > > > >>>>>> not be available for IQ.* >> > > > > >>>>>> >> > > > > >>>>>> WDYT? >> > > > > >>>>>> >> > > > > >>>>>> Guozhang, >> > > > > >>>>>> >> > > > > >>>>>>> 1. About not breaking compatibility of stream-stream join >> > > > > >>>> materialized >> > > > > >>>>>>> stores: I think this is a valid issue to tackle, but after >> > > > > >> thinking >> > > > > >>>> about >> > > > > >>>>>>> it once more I'm not sure if exposing Materialized would >> be a >> > > > > >> good >> > > > > >>>>>> solution >> > > > > >>>>>>> here. My rationles: >> > > > > >>>>>>> >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of >> window-store >> > > is >> > > > > >> not >> > > > > >>>>>> ideal, >> > > > > >>>>>>> and we want to modify it in the near future to be more >> > > > > >> efficient. Not >> > > > > >>>>>>> allowing users to override such state store backend gives >> us >> > > such >> > > > > >>>> freedom >> > > > > >>>>>>> (which was also considered in the original DSL design), >> > whereas >> > > > > >>>> getting a >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out that freedom >> > out >> > > > > >> of the >> > > > > >>>>>>> window. >> > > > > >>>>>>> 1.b For strema-stream join, in our original design we >> intend >> > to >> > > > > >>>> "never" >> > > > > >>>>>>> want users to query the state, since it is just for >> buffering >> > > the >> > > > > >>>>>> upcoming >> > > > > >>>>>>> records from the stream. Now I know that some users may >> > indeed >> > > > > >> want >> > > > > >>>> to >> > > > > >>>>>>> query it from the debugging perspective, but still I >> > concerned >> > > > > >> about >> > > > > >>>>>>> whether leveraging IQ for debugging purposes would be the >> > right >> > > > > >>>> solution >> > > > > >>>>>>> here. And adding Materialized object opens the door to let >> > > users >> > > > > >>>> query >> > > > > >>>>>>> about it (unless we did something intentionally to still >> > > forbids >> > > > > >> it), >> > > > > >>>>>> which >> > > > > >>>>>>> also restricts us in the future. >> > > > > >>>>>>> >> > > > > >>>>>>> 2. About the coupling between Materialized.name() and >> > > queryable: >> > > > > >>>> again I >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if the >> current >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the >> best >> > > > > >>>> approach. >> > > > > >>>>>>> Here I think I agree with John, that generally speaking >> it's >> > > > > >> better >> > > > > >>>> be a >> > > > > >>>>>>> control function on the `KTable` itself, rather than on >> > > > > >>>> `Materialized`, >> > > > > >>>>>> so >> > > > > >>>>>>> fixing it via adding functions through `Materialized` >> seems >> > > not a >> > > > > >>>> natural >> > > > > >>>>>> approach either. >> > > > > >>>>>> >> > > > > >>>>>> I understand your thoughts here, and up to a point, I agree >> > with >> > > > > >> you. >> > > > > >>>>>> But concerning not providing Materialized as it may >> restrict >> > us >> > > in >> > > > > >> the >> > > > > >>>>>> future for delivering different implementations, I'm >> wondering >> > > if >> > > > > >> we >> > > > > >>>> are >> > > > > >>>>>> doing some premature optimization here. >> > > > > >>>>>> My rationale for saying so >> > > > > >>>>>> >> > > > > >>>>>> 1. I think the cost of not allowing the naming of state >> > > stores >> > > > > >> for >> > > > > >>>> joins >> > > > > >>>>>> is too big of a gap to leave. IMHO for joins to follow >> > the >> > > > > >> current >> > > > > >>>>>> pattern of using Materialized for naming state stores >> would >> > > be >> > > > > >> what >> > > > > >>>> most >> > > > > >>>>>> users would expect to use. As I said in my comments >> > above, I >> > > > > >> think >> > > > > >>>> we >> > > > > >>>>>> should *not include* the changes to Materialized and >> > enforce >> > > > > >> named >> > > > > >>>>>> stores for joins as unavailable for IQ. >> > > > > >>>>>> 2. We'll still have the join methods available without a >> > > > > >>>> Materialized >> > > > > >>>>>> allowing us to do something different internally if a >> > > > > >> Materialized >> > > > > >>>> is >> > > > > >>>>>> not >> > > > > >>>>>> provided. >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use two stones >> > > rather >> > > > > >>>> than >> > > > > >>>>>> one >> > > > > >>>>>>> to kill these two birds, and probably for this KIP we just >> > > focus >> > > > > >> on >> > > > > >>>> 1) >> > > > > >>>>>>> above. And for that I'd like to not expose the >> Materialized >> > > > > >> either >> > > > > >>>> for >> > > > > >>>>>>> rationales that I've listed above. Instead, we just >> restrict >> > > > > >> KIP-307 >> > > > > >>>> to >> > > > > >>>>>> NOT >> > > > > >>>>>>> use the Joined.name for state store names and always use >> > > internal >> > > > > >>>> names >> > > > > >>>>>> as >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of not being >> able >> > > to >> > > > > >>>> cover >> > > > > >>>>>> all >> > > > > >>>>>>> internal names here, but now I feel this `hole` may >> better be >> > > > > >> filled >> > > > > >>>> by, >> > > > > >>>>>>> e.g. not creating changelog topics but just use the >> upstream >> > to >> > > > > >>>>>>> re-bootstrap the materialized store, more concretely: when >> > > > > >>>> materializing >> > > > > >>>>>>> the store, try to piggy-back the changelog topic on an >> > existing >> > > > > >>>> topic, >> > > > > >>>>>> e.g. >> > > > > >>>>>>> a) if the stream is coming directly from some source topic >> > > > > >> (including >> > > > > >>>>>>> repartition topic), make that as changelog topic and if >> it is >> > > > > >>>> repartition >> > > > > >>>>>>> topic change the retention / data purging policy >> necessarily >> > as >> > > > > >>>> well; b) >> > > > > >>>>>> if >> > > > > >>>>>>> the stream is coming from some stateless operators, >> delegate >> > > that >> > > > > >>>>>> stateless >> > > > > >>>>>>> operator to the parent stream similar as a); if the >> stream is >> > > > > >> coming >> > > > > >>>> from >> > > > > >>>>>> a >> > > > > >>>>>>> stream-stream join which is the only stateful operator >> that >> > can >> > > > > >>>> result in >> > > > > >>>>>> a >> > > > > >>>>>>> stream, consider merging the join into multi-way joins >> (yes, >> > > > > >> this is >> > > > > >>>> a >> > > > > >>>>>> very >> > > > > >>>>>>> hand-wavy thought, but the point here is that we do not >> try >> > to >> > > > > >>>> tackle it >> > > > > >>>>>>> now but leave it for a better solution :). >> > > > > >>>>>> >> > > > > >>>>>> I really like this idea! I agree with you in that this >> > approach >> > > > > >> to too >> > > > > >>>>>> much for adding in this KIP, but we could pick it up later >> and >> > > > > >>>> leverage the >> > > > > >>>>>> Optimization framework to accomplish this re-use. >> > > > > >>>>>> Again, while I agree we should break the naming of join >> state >> > > > > >> stores >> > > > > >>>> from >> > > > > >>>>>> KIP-307, IMHO it's something we should fix now as it will >> be >> > the >> > > > > >> last >> > > > > >>>> piece >> > > > > >>>>>> we can provide to give users the ability to completely make >> > > their >> > > > > >>>>>> topologies "upgrade proof" when adding additional >> operations. >> > > > > >>>>>> >> > > > > >>>>>> Thanks again to both of you for comments and I look >> forward to >> > > > > >> hearing >> > > > > >>>> back >> > > > > >>>>>> from you. >> > > > > >>>>>> >> > > > > >>>>>> Regards, >> > > > > >>>>>> Bill >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>> >> > > > > >>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang < >> > > wangg...@gmail.com> >> > > > > >>>> wrote: >> > > > > >>>>>> >> > > > > >>>>>>> Hello Bill, >> > > > > >>>>>>> >> > > > > >>>>>>> Thanks for the KIP. Glad to see that we can likely >> shooting >> > two >> > > > > >> birds >> > > > > >>>>>> with >> > > > > >>>>>>> one stone. I have some concerns though about those "two >> > birds" >> > > > > >>>>>> themselves: >> > > > > >>>>>>> >> > > > > >>>>>>> 1. About not breaking compatibility of stream-stream join >> > > > > >>>> materialized >> > > > > >>>>>>> stores: I think this is a valid issue to tackle, but after >> > > > > >> thinking >> > > > > >>>> about >> > > > > >>>>>>> it once more I'm not sure if exposing Materialized would >> be a >> > > > > >> good >> > > > > >>>>>> solution >> > > > > >>>>>>> here. My rationles: >> > > > > >>>>>>> >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of >> window-store >> > > is >> > > > > >> not >> > > > > >>>>>> ideal, >> > > > > >>>>>>> and we want to modify it in the near future to be more >> > > > > >> efficient. Not >> > > > > >>>>>>> allowing users to override such state store backend gives >> us >> > > such >> > > > > >>>> freedom >> > > > > >>>>>>> (which was also considered in the original DSL design), >> > whereas >> > > > > >>>> getting a >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out that freedom >> > out >> > > > > >> of the >> > > > > >>>>>>> window. >> > > > > >>>>>>> 1.b For strema-stream join, in our original design we >> intend >> > to >> > > > > >>>> "never" >> > > > > >>>>>>> want users to query the state, since it is just for >> buffering >> > > the >> > > > > >>>>>> upcoming >> > > > > >>>>>>> records from the stream. Now I know that some users may >> > indeed >> > > > > >> want >> > > > > >>>> to >> > > > > >>>>>>> query it from the debugging perspective, but still I >> > concerned >> > > > > >> about >> > > > > >>>>>>> whether leveraging IQ for debugging purposes would be the >> > right >> > > > > >>>> solution >> > > > > >>>>>>> here. And adding Materialized object opens the door to let >> > > users >> > > > > >>>> query >> > > > > >>>>>>> about it (unless we did something intentionally to still >> > > forbids >> > > > > >> it), >> > > > > >>>>>> which >> > > > > >>>>>>> also restricts us in the future. >> > > > > >>>>>>> >> > > > > >>>>>>> 2. About the coupling between Materialized.name() and >> > > queryable: >> > > > > >>>> again I >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if the >> current >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the >> best >> > > > > >>>> approach. >> > > > > >>>>>>> Here I think I agree with John, that generally speaking >> it's >> > > > > >> better >> > > > > >>>> be a >> > > > > >>>>>>> control function on the `KTable` itself, rather than on >> > > > > >>>> `Materialized`, >> > > > > >>>>>> so >> > > > > >>>>>>> fixing it via adding functions through `Materialized` >> seems >> > > not a >> > > > > >>>> natural >> > > > > >>>>>>> approach either. >> > > > > >>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use two stones >> > > rather >> > > > > >>>> than >> > > > > >>>>>> one >> > > > > >>>>>>> to kill these two birds, and probably for this KIP we just >> > > focus >> > > > > >> on >> > > > > >>>> 1) >> > > > > >>>>>>> above. And for that I'd like to not expose the >> Materialized >> > > > > >> either >> > > > > >>>> for >> > > > > >>>>>>> rationales that I've listed above. Instead, we just >> restrict >> > > > > >> KIP-307 >> > > > > >>>> to >> > > > > >>>>>> NOT >> > > > > >>>>>>> use the Joined.name for state store names and always use >> > > internal >> > > > > >>>> names >> > > > > >>>>>> as >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of not being >> able >> > > to >> > > > > >>>> cover >> > > > > >>>>>> all >> > > > > >>>>>>> internal names here, but now I feel this `hole` may >> better be >> > > > > >> filled >> > > > > >>>> by, >> > > > > >>>>>>> e.g. not creating changelog topics but just use the >> upstream >> > to >> > > > > >>>>>>> re-bootstrap the materialized store, more concretely: when >> > > > > >>>> materializing >> > > > > >>>>>>> the store, try to piggy-back the changelog topic on an >> > existing >> > > > > >>>> topic, >> > > > > >>>>>> e.g. >> > > > > >>>>>>> a) if the stream is coming directly from some source topic >> > > > > >> (including >> > > > > >>>>>>> repartition topic), make that as changelog topic and if >> it is >> > > > > >>>> repartition >> > > > > >>>>>>> topic change the retention / data purging policy >> necessarily >> > as >> > > > > >>>> well; b) >> > > > > >>>>>> if >> > > > > >>>>>>> the stream is coming from some stateless operators, >> delegate >> > > that >> > > > > >>>>>> stateless >> > > > > >>>>>>> operator to the parent stream similar as a); if the >> stream is >> > > > > >> coming >> > > > > >>>>>> from a >> > > > > >>>>>>> stream-stream join which is the only stateful operator >> that >> > can >> > > > > >>>> result >> > > > > >>>>>> in a >> > > > > >>>>>>> stream, consider merging the join into multi-way joins >> (yes, >> > > > > >> this is >> > > > > >>>> a >> > > > > >>>>>> very >> > > > > >>>>>>> hand-wavy thought, but the point here is that we do not >> try >> > to >> > > > > >>>> tackle it >> > > > > >>>>>>> now but leave it for a better solution :). >> > > > > >>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>>> Guozhang >> > > > > >>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler < >> > > j...@confluent.io >> > > > > >>> >> > > > > >>>> wrote: >> > > > > >>>>>>> >> > > > > >>>>>>>> Hi Bill, >> > > > > >>>>>>>> >> > > > > >>>>>>>> Thanks for the KIP! Awesome job catching this unexpected >> > > > > >>>> consequence >> > > > > >>>>>>>> of the prior KIPs before it was released. >> > > > > >>>>>>>> >> > > > > >>>>>>>> The proposal looks good to me. On top of just fixing the >> > > > > >> problem, >> > > > > >>>> it >> > > > > >>>>>>>> seems to address two other pain points: >> > > > > >>>>>>>> * that naming a state store automatically causes it to >> > become >> > > > > >>>>>> queriable. >> > > > > >>>>>>>> * that there's currently no way to configure the bytes >> store >> > > > > >> for >> > > > > >>>> join >> > > > > >>>>>>>> windows. >> > > > > >>>>>>>> >> > > > > >>>>>>>> It's awesome that we can fix this issue and two others >> with >> > > one >> > > > > >>>>>> feature. >> > > > > >>>>>>>> >> > > > > >>>>>>>> I'm wondering about a missing quadrant from the truth >> table >> > > > > >>>> involving >> > > > > >>>>>>>> whether a Materialized is stored or not and querying is >> > > > > >>>>>>>> enabled/disabled... What should be the behavior if there >> is >> > no >> > > > > >>>> store >> > > > > >>>>>>>> configured (e.g., if Materialized with only serdes) and >> > > > > >> querying is >> > > > > >>>>>>>> enabled? >> > > > > >>>>>>>> >> > > > > >>>>>>>> It seems we have two choices: >> > > > > >>>>>>>> 1. we can force creation of a state store in this case, >> so >> > the >> > > > > >>>> store >> > > > > >>>>>>>> can be used to serve the queries >> > > > > >>>>>>>> 2. we can provide just a queriable view, basically >> letting >> > IQ >> > > > > >> query >> > > > > >>>>>>>> into the "KTableValueGetter", which would transparently >> > > > > >> construct >> > > > > >>>> the >> > > > > >>>>>>>> query response by applying the operator logic to the >> > upstream >> > > > > >>>> state if >> > > > > >>>>>>>> the operator state isn't already stored. >> > > > > >>>>>>>> >> > > > > >>>>>>>> Offhand, it seems like the second is actually a pretty >> > awesome >> > > > > >>>>>>>> capability. But it might have an awkward interaction with >> > the >> > > > > >>>> current >> > > > > >>>>>>>> semantics. Presently, if I provide a >> Materialized.withName, >> > it >> > > > > >>>> implies >> > > > > >>>>>>>> that querying should be enabled AND that the view should >> > > > > >> actually >> > > > > >>>> be >> > > > > >>>>>>>> stored in a state store. Under option 2 above, this >> behavior >> > > > > >> would >> > > > > >>>>>>>> change to NOT provision a state store and instead just >> > consult >> > > > > >> the >> > > > > >>>>>>>> ValueGetter. To get back to the current behavior, users >> > would >> > > > > >> have >> > > > > >>>> to >> > > > > >>>>>>>> add a "bytes store supplier" to the Materialized to >> indicate >> > > > > >> that, >> > > > > >>>>>>>> yes, they really want a state store there. >> > > > > >>>>>>>> >> > > > > >>>>>>>> Behavior changes are always kind of scary, but I think in >> > this >> > > > > >>>> case, >> > > > > >>>>>>>> it might actually be preferable. In the event where only >> the >> > > > > >> name >> > > > > >>>> is >> > > > > >>>>>>>> provided, it means that people just wanted to make the >> > > > > >> operation >> > > > > >>>>>>>> result queriable. If we automatically convert this to a >> > > > > >> non-stored >> > > > > >>>>>>>> view, then simply upgrading results in the same >> observable >> > > > > >> behavior >> > > > > >>>>>>>> and semantics, but a linear reduction in local storage >> > > > > >> requirements >> > > > > >>>>>>>> and disk i/o, as well as a corresponding linear >> reduction in >> > > > > >> memory >> > > > > >>>>>>>> usage both on and off heap. >> > > > > >>>>>>>> >> > > > > >>>>>>>> What do you think? >> > > > > >>>>>>>> -John >> > > > > >>>>>>>> >> > > > > >>>>>>>> On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck < >> > > bbej...@gmail.com >> > > > > >>> >> > > > > >>>> wrote: >> > > > > >>>>>>>>> >> > > > > >>>>>>>>> All, >> > > > > >>>>>>>>> >> > > > > >>>>>>>>> I'd like to start a discussion for adding a Materialized >> > > > > >>>>>> configuration >> > > > > >>>>>>>>> object to KStream.join for naming state stores involved >> in >> > > > > >> joins. >> > > > > >>>>>>>>> >> > > > > >>>>>>>>> >> > > > > >>>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>> >> > > > > >>>> >> > > > > >> >> > > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join >> > > > > >>>>>>>>> >> > > > > >>>>>>>>> Your comments and suggestions are welcome. >> > > > > >>>>>>>>> >> > > > > >>>>>>>>> Thanks, >> > > > > >>>>>>>>> Bill >> > > > > >>>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>>> -- >> > > > > >>>>>>> -- Guozhang >> > > > > >>>>>>> >> > > > > >>>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> -- >> > > > > >>>>> -- Guozhang >> > > > > >>>> >> > > > > >>> >> > > > > >>> >> > > > > >>> -- >> > > > > >>> -- Guozhang >> > > > > >> >> > > > > > >> > > > > > >> > > > > >> > > > > >> > > >> > >> >> >> -- >> -- Guozhang >> >