Hey Guozhang and Bill, For what it's worth, I agree with you both!
I think it might help the discussion to look concretely at what Materialized does: * set a WindowBytesStoreSupplier * set a name * set the key/value serdes * disable/enable/configure change-logging * disable/enable caching * configure retention Further, looking into the WindowBytesStoreSupplier, the interface lets you: * get the segment interval * get the window size * get whether "duplicates" are enabled * get the retention period * (obviously) get a WindowStore<Bytes, byte[]> We know that Materialized isn't exactly what we need for stream joins, but we can see how close Materialized is to what we need. If it is close, maybe we can use it and document the gaps, and if it is not close, then maybe we should just add what we need to Joined. Stream Join's requirements for its stores: * a multimap store (i.e., it keeps duplicates) for storing general (not windowed) keyed records associated with their insertion time, and allows efficient time-bounded lookups and also efficient purges of old data. ** Note, a properly configured WindowBytesStoreSupplier satisfies this requirement, and the interface supports the queries we need to verify the configuration at run-time * set a name for the store * do _not_ set the serdes (they are already set in Joined) * logging could be configurable (set to enabled now) * caching could be configurable (set to enabled now) * do _not_ configure retention (determined by JoinWindows) So, out of six capabilities for Materialized, there are two we don't want (serdes and retention). These would become run-time checks if we use it. A third questionable capability is to provide a WindowBytesStoreSupplier. Looking at whether the WindowBytesStoreSupplier is the right interface for Stream Join: * configuring segment interval is fine * should _not_ configure window size (it's determined by JoinWindows) * duplicates _must_ be enabled * retention should be _at least_ windowSize + gracePeriod, but note that (unlike for Table window stores) there is no utility in having a longer retention time. * the WindowStore<Bytes, byte[]> itself is fine, if overly broad, since the only two methods we need are `window.put(key, value, context().timestamp())` and `WindowStoreIterator<V2> iter = window.fetch(key, timeFrom, timeTo)`. Thus, flattening out the overlap for WindowBytesStoreSupplier onto the overlap for Materialized, we have 9 capabilities total (note retention is duplicated), we have 4 that we don't want: * do _not_ set the serdes (they are already set in Joined) * do _not_ configure retention (determined by JoinWindows) * should _not_ configure window size (it's determined by JoinWindows) * duplicates _must_ be enabled These gaps would have to be covered with run-time checks if we re-use Materialized and WindowStoreBytesStoreSupplier both. Maybe this sounds bad, but consider the other side, that we get 5 new capabilities we don't require, but are still pretty nice: * configure the bytes store * set a name for the store * configure caching * configure logging * configure segment interval Not sure where this nets us out, but it's food for thought. -John On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <wangg...@gmail.com> wrote: > > Hi Bill, > > I think by giving a Materialized param into stream-stream join, it's okay > (though still ideal) to say "we still would not expose the store for > queries", but it would sound a bit awkward to say "we would also ignore > whatever the passed in store supplier but just use our default ones" -- > again the concern is that, if in the future we'd want to change the default > implementation of join algorithm which no longer rely on a window store > with deduping enabled, then we need to change this API again by changing > the store supplier type. > > If we do want to fill this hole for stream-stream join, I feel just adding > a String typed store-name would even be less future-intrusive if we expect > this parameter to be modified later. > > Does that makes sense? > > > Guozhang > > On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > Thanks for the comments John and Guozhang, I'll address each one of your > > comments in turn. > > > > John, > > > > > I'm wondering about a missing quadrant from the truth table involving > > > whether a Materialized is stored or not and querying is > > > enabled/disabled... What should be the behavior if there is no store > > > configured (e.g., if Materialized with only serdes) and querying is > > enabled? > > > > > It seems we have two choices: > > > 1. we can force creation of a state store in this case, so the store > > > can be used to serve the queries > > > 2. we can provide just a queriable view, basically letting IQ query > > > into the "KTableValueGetter", which would transparently construct the > > > query response by applying the operator logic to the upstream state if > > > the operator state isn't already stored. > > > > > > I agree with your assertion about a missing quadrant from the truth table. > > Additionally, I too like the concept of a queriable view. But I think that > > goes a bit beyond the scope of this KIP and would like to pursue that > > feature as follow-on work. Also thinking about this KIP some more, I'm > > thinking of the changes to Materialized might be a reach as well. > > Separating the naming from a store and its queryable state seems like a > > complex issue in and of itself and should be treated accordingly. > > > > So here's what I'm thinking now. We add Materialzied to Join, but for now, > > we internally disable querying. I know this breaks our current semantic > > approach, but I think it's crucial that we do two things in this KIP > > > > 1. Break the naming of the state stores from Joined to Materialized, so > > the naming of state stores follows our current pattern and enables > > upgrades > > from 2.3 to 2.4 > > 2. Offer the ability to configure the state stores of the join, even > > providing a different implementation (i.e. in-memory) if desired. > > > > With that in mind I'm considering changing the KIP to remove the changes to > > Materialized, and we document very clearly that by providing a Materialized > > object with a name is only for naming the state store, hence the changelog > > topics and any possible configurations of the store, but this store *will > > not be available for IQ.* > > > > WDYT? > > > > Guozhang, > > > > > 1. About not breaking compatibility of stream-stream join materialized > > > stores: I think this is a valid issue to tackle, but after thinking about > > > it once more I'm not sure if exposing Materialized would be a good > > solution > > > here. My rationles: > > > > > > 1.a For stream-stream join, our current usage of window-store is not > > ideal, > > > and we want to modify it in the near future to be more efficient. Not > > > allowing users to override such state store backend gives us such freedom > > > (which was also considered in the original DSL design), whereas getting a > > > Materialized<WindowStore> basically kicks out that freedom out of the > > > window. > > > 1.b For strema-stream join, in our original design we intend to "never" > > > want users to query the state, since it is just for buffering the > > upcoming > > > records from the stream. Now I know that some users may indeed want to > > > query it from the debugging perspective, but still I concerned about > > > whether leveraging IQ for debugging purposes would be the right solution > > > here. And adding Materialized object opens the door to let users query > > > about it (unless we did something intentionally to still forbids it), > > which > > > also restricts us in the future. > > > > > > 2. About the coupling between Materialized.name() and queryable: again I > > > think this is a valid issue. But I'm not sure if the current > > > "withQuerryingDisabled / Enabled" at Materialized is the best approach. > > > Here I think I agree with John, that generally speaking it's better be a > > > control function on the `KTable` itself, rather than on `Materialized`, > > so > > > fixing it via adding functions through `Materialized` seems not a natural > > approach either. > > > > I understand your thoughts here, and up to a point, I agree with you. > > But concerning not providing Materialized as it may restrict us in the > > future for delivering different implementations, I'm wondering if we are > > doing some premature optimization here. > > My rationale for saying so > > > > 1. I think the cost of not allowing the naming of state stores for joins > > is too big of a gap to leave. IMHO for joins to follow the current > > pattern of using Materialized for naming state stores would be what most > > users would expect to use. As I said in my comments above, I think we > > should *not include* the changes to Materialized and enforce named > > stores for joins as unavailable for IQ. > > 2. We'll still have the join methods available without a Materialized > > allowing us to do something different internally if a Materialized is > > not > > provided. > > > > > > > Overall, I'm thinking maybe we should still use two stones rather than > > one > > > to kill these two birds, and probably for this KIP we just focus on 1) > > > above. And for that I'd like to not expose the Materialized either for > > > rationales that I've listed above. Instead, we just restrict KIP-307 to > > NOT > > > use the Joined.name for state store names and always use internal names > > as > > > well, which admittedly indeed leaves a hole of not being able to cover > > all > > > internal names here, but now I feel this `hole` may better be filled by, > > > e.g. not creating changelog topics but just use the upstream to > > > re-bootstrap the materialized store, more concretely: when materializing > > > the store, try to piggy-back the changelog topic on an existing topic, > > e.g. > > > a) if the stream is coming directly from some source topic (including > > > repartition topic), make that as changelog topic and if it is repartition > > > topic change the retention / data purging policy necessarily as well; b) > > if > > > the stream is coming from some stateless operators, delegate that > > stateless > > > operator to the parent stream similar as a); if the stream is coming from > > a > > > stream-stream join which is the only stateful operator that can result in > > a > > > stream, consider merging the join into multi-way joins (yes, this is a > > very > > > hand-wavy thought, but the point here is that we do not try to tackle it > > > now but leave it for a better solution :). > > > > I really like this idea! I agree with you in that this approach to too > > much for adding in this KIP, but we could pick it up later and leverage the > > Optimization framework to accomplish this re-use. > > Again, while I agree we should break the naming of join state stores from > > KIP-307, IMHO it's something we should fix now as it will be the last piece > > we can provide to give users the ability to completely make their > > topologies "upgrade proof" when adding additional operations. > > > > Thanks again to both of you for comments and I look forward to hearing back > > from you. > > > > Regards, > > Bill > > > > > > > > > > > > > > > > On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hello Bill, > > > > > > Thanks for the KIP. Glad to see that we can likely shooting two birds > > with > > > one stone. I have some concerns though about those "two birds" > > themselves: > > > > > > 1. About not breaking compatibility of stream-stream join materialized > > > stores: I think this is a valid issue to tackle, but after thinking about > > > it once more I'm not sure if exposing Materialized would be a good > > solution > > > here. My rationles: > > > > > > 1.a For stream-stream join, our current usage of window-store is not > > ideal, > > > and we want to modify it in the near future to be more efficient. Not > > > allowing users to override such state store backend gives us such freedom > > > (which was also considered in the original DSL design), whereas getting a > > > Materialized<WindowStore> basically kicks out that freedom out of the > > > window. > > > 1.b For strema-stream join, in our original design we intend to "never" > > > want users to query the state, since it is just for buffering the > > upcoming > > > records from the stream. Now I know that some users may indeed want to > > > query it from the debugging perspective, but still I concerned about > > > whether leveraging IQ for debugging purposes would be the right solution > > > here. And adding Materialized object opens the door to let users query > > > about it (unless we did something intentionally to still forbids it), > > which > > > also restricts us in the future. > > > > > > 2. About the coupling between Materialized.name() and queryable: again I > > > think this is a valid issue. But I'm not sure if the current > > > "withQuerryingDisabled / Enabled" at Materialized is the best approach. > > > Here I think I agree with John, that generally speaking it's better be a > > > control function on the `KTable` itself, rather than on `Materialized`, > > so > > > fixing it via adding functions through `Materialized` seems not a natural > > > approach either. > > > > > > > > > Overall, I'm thinking maybe we should still use two stones rather than > > one > > > to kill these two birds, and probably for this KIP we just focus on 1) > > > above. And for that I'd like to not expose the Materialized either for > > > rationales that I've listed above. Instead, we just restrict KIP-307 to > > NOT > > > use the Joined.name for state store names and always use internal names > > as > > > well, which admittedly indeed leaves a hole of not being able to cover > > all > > > internal names here, but now I feel this `hole` may better be filled by, > > > e.g. not creating changelog topics but just use the upstream to > > > re-bootstrap the materialized store, more concretely: when materializing > > > the store, try to piggy-back the changelog topic on an existing topic, > > e.g. > > > a) if the stream is coming directly from some source topic (including > > > repartition topic), make that as changelog topic and if it is repartition > > > topic change the retention / data purging policy necessarily as well; b) > > if > > > the stream is coming from some stateless operators, delegate that > > stateless > > > operator to the parent stream similar as a); if the stream is coming > > from a > > > stream-stream join which is the only stateful operator that can result > > in a > > > stream, consider merging the join into multi-way joins (yes, this is a > > very > > > hand-wavy thought, but the point here is that we do not try to tackle it > > > now but leave it for a better solution :). > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Jun 19, 2019 at 11:41 AM John Roesler <j...@confluent.io> wrote: > > > > > > > Hi Bill, > > > > > > > > Thanks for the KIP! Awesome job catching this unexpected consequence > > > > of the prior KIPs before it was released. > > > > > > > > The proposal looks good to me. On top of just fixing the problem, it > > > > seems to address two other pain points: > > > > * that naming a state store automatically causes it to become > > queriable. > > > > * that there's currently no way to configure the bytes store for join > > > > windows. > > > > > > > > It's awesome that we can fix this issue and two others with one > > feature. > > > > > > > > I'm wondering about a missing quadrant from the truth table involving > > > > whether a Materialized is stored or not and querying is > > > > enabled/disabled... What should be the behavior if there is no store > > > > configured (e.g., if Materialized with only serdes) and querying is > > > > enabled? > > > > > > > > It seems we have two choices: > > > > 1. we can force creation of a state store in this case, so the store > > > > can be used to serve the queries > > > > 2. we can provide just a queriable view, basically letting IQ query > > > > into the "KTableValueGetter", which would transparently construct the > > > > query response by applying the operator logic to the upstream state if > > > > the operator state isn't already stored. > > > > > > > > Offhand, it seems like the second is actually a pretty awesome > > > > capability. But it might have an awkward interaction with the current > > > > semantics. Presently, if I provide a Materialized.withName, it implies > > > > that querying should be enabled AND that the view should actually be > > > > stored in a state store. Under option 2 above, this behavior would > > > > change to NOT provision a state store and instead just consult the > > > > ValueGetter. To get back to the current behavior, users would have to > > > > add a "bytes store supplier" to the Materialized to indicate that, > > > > yes, they really want a state store there. > > > > > > > > Behavior changes are always kind of scary, but I think in this case, > > > > it might actually be preferable. In the event where only the name is > > > > provided, it means that people just wanted to make the operation > > > > result queriable. If we automatically convert this to a non-stored > > > > view, then simply upgrading results in the same observable behavior > > > > and semantics, but a linear reduction in local storage requirements > > > > and disk i/o, as well as a corresponding linear reduction in memory > > > > usage both on and off heap. > > > > > > > > What do you think? > > > > -John > > > > > > > > On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > > > > > > > > All, > > > > > > > > > > I'd like to start a discussion for adding a Materialized > > configuration > > > > > object to KStream.join for naming state stores involved in joins. > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join > > > > > > > > > > Your comments and suggestions are welcome. > > > > > > > > > > Thanks, > > > > > Bill > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang