Hi Bill, I think by giving a Materialized param into stream-stream join, it's okay (though still ideal) to say "we still would not expose the store for queries", but it would sound a bit awkward to say "we would also ignore whatever the passed in store supplier but just use our default ones" -- again the concern is that, if in the future we'd want to change the default implementation of join algorithm which no longer rely on a window store with deduping enabled, then we need to change this API again by changing the store supplier type.
If we do want to fill this hole for stream-stream join, I feel just adding a String typed store-name would even be less future-intrusive if we expect this parameter to be modified later. Does that makes sense? Guozhang On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <bbej...@gmail.com> wrote: > Thanks for the comments John and Guozhang, I'll address each one of your > comments in turn. > > John, > > > I'm wondering about a missing quadrant from the truth table involving > > whether a Materialized is stored or not and querying is > > enabled/disabled... What should be the behavior if there is no store > > configured (e.g., if Materialized with only serdes) and querying is > enabled? > > > It seems we have two choices: > > 1. we can force creation of a state store in this case, so the store > > can be used to serve the queries > > 2. we can provide just a queriable view, basically letting IQ query > > into the "KTableValueGetter", which would transparently construct the > > query response by applying the operator logic to the upstream state if > > the operator state isn't already stored. > > > I agree with your assertion about a missing quadrant from the truth table. > Additionally, I too like the concept of a queriable view. But I think that > goes a bit beyond the scope of this KIP and would like to pursue that > feature as follow-on work. Also thinking about this KIP some more, I'm > thinking of the changes to Materialized might be a reach as well. > Separating the naming from a store and its queryable state seems like a > complex issue in and of itself and should be treated accordingly. > > So here's what I'm thinking now. We add Materialzied to Join, but for now, > we internally disable querying. I know this breaks our current semantic > approach, but I think it's crucial that we do two things in this KIP > > 1. Break the naming of the state stores from Joined to Materialized, so > the naming of state stores follows our current pattern and enables > upgrades > from 2.3 to 2.4 > 2. Offer the ability to configure the state stores of the join, even > providing a different implementation (i.e. in-memory) if desired. > > With that in mind I'm considering changing the KIP to remove the changes to > Materialized, and we document very clearly that by providing a Materialized > object with a name is only for naming the state store, hence the changelog > topics and any possible configurations of the store, but this store *will > not be available for IQ.* > > WDYT? > > Guozhang, > > > 1. About not breaking compatibility of stream-stream join materialized > > stores: I think this is a valid issue to tackle, but after thinking about > > it once more I'm not sure if exposing Materialized would be a good > solution > > here. My rationles: > > > > 1.a For stream-stream join, our current usage of window-store is not > ideal, > > and we want to modify it in the near future to be more efficient. Not > > allowing users to override such state store backend gives us such freedom > > (which was also considered in the original DSL design), whereas getting a > > Materialized<WindowStore> basically kicks out that freedom out of the > > window. > > 1.b For strema-stream join, in our original design we intend to "never" > > want users to query the state, since it is just for buffering the > upcoming > > records from the stream. Now I know that some users may indeed want to > > query it from the debugging perspective, but still I concerned about > > whether leveraging IQ for debugging purposes would be the right solution > > here. And adding Materialized object opens the door to let users query > > about it (unless we did something intentionally to still forbids it), > which > > also restricts us in the future. > > > > 2. About the coupling between Materialized.name() and queryable: again I > > think this is a valid issue. But I'm not sure if the current > > "withQuerryingDisabled / Enabled" at Materialized is the best approach. > > Here I think I agree with John, that generally speaking it's better be a > > control function on the `KTable` itself, rather than on `Materialized`, > so > > fixing it via adding functions through `Materialized` seems not a natural > approach either. > > I understand your thoughts here, and up to a point, I agree with you. > But concerning not providing Materialized as it may restrict us in the > future for delivering different implementations, I'm wondering if we are > doing some premature optimization here. > My rationale for saying so > > 1. I think the cost of not allowing the naming of state stores for joins > is too big of a gap to leave. IMHO for joins to follow the current > pattern of using Materialized for naming state stores would be what most > users would expect to use. As I said in my comments above, I think we > should *not include* the changes to Materialized and enforce named > stores for joins as unavailable for IQ. > 2. We'll still have the join methods available without a Materialized > allowing us to do something different internally if a Materialized is > not > provided. > > > > Overall, I'm thinking maybe we should still use two stones rather than > one > > to kill these two birds, and probably for this KIP we just focus on 1) > > above. And for that I'd like to not expose the Materialized either for > > rationales that I've listed above. Instead, we just restrict KIP-307 to > NOT > > use the Joined.name for state store names and always use internal names > as > > well, which admittedly indeed leaves a hole of not being able to cover > all > > internal names here, but now I feel this `hole` may better be filled by, > > e.g. not creating changelog topics but just use the upstream to > > re-bootstrap the materialized store, more concretely: when materializing > > the store, try to piggy-back the changelog topic on an existing topic, > e.g. > > a) if the stream is coming directly from some source topic (including > > repartition topic), make that as changelog topic and if it is repartition > > topic change the retention / data purging policy necessarily as well; b) > if > > the stream is coming from some stateless operators, delegate that > stateless > > operator to the parent stream similar as a); if the stream is coming from > a > > stream-stream join which is the only stateful operator that can result in > a > > stream, consider merging the join into multi-way joins (yes, this is a > very > > hand-wavy thought, but the point here is that we do not try to tackle it > > now but leave it for a better solution :). > > I really like this idea! I agree with you in that this approach to too > much for adding in this KIP, but we could pick it up later and leverage the > Optimization framework to accomplish this re-use. > Again, while I agree we should break the naming of join state stores from > KIP-307, IMHO it's something we should fix now as it will be the last piece > we can provide to give users the ability to completely make their > topologies "upgrade proof" when adding additional operations. > > Thanks again to both of you for comments and I look forward to hearing back > from you. > > Regards, > Bill > > > > > > > > On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Bill, > > > > Thanks for the KIP. Glad to see that we can likely shooting two birds > with > > one stone. I have some concerns though about those "two birds" > themselves: > > > > 1. About not breaking compatibility of stream-stream join materialized > > stores: I think this is a valid issue to tackle, but after thinking about > > it once more I'm not sure if exposing Materialized would be a good > solution > > here. My rationles: > > > > 1.a For stream-stream join, our current usage of window-store is not > ideal, > > and we want to modify it in the near future to be more efficient. Not > > allowing users to override such state store backend gives us such freedom > > (which was also considered in the original DSL design), whereas getting a > > Materialized<WindowStore> basically kicks out that freedom out of the > > window. > > 1.b For strema-stream join, in our original design we intend to "never" > > want users to query the state, since it is just for buffering the > upcoming > > records from the stream. Now I know that some users may indeed want to > > query it from the debugging perspective, but still I concerned about > > whether leveraging IQ for debugging purposes would be the right solution > > here. And adding Materialized object opens the door to let users query > > about it (unless we did something intentionally to still forbids it), > which > > also restricts us in the future. > > > > 2. About the coupling between Materialized.name() and queryable: again I > > think this is a valid issue. But I'm not sure if the current > > "withQuerryingDisabled / Enabled" at Materialized is the best approach. > > Here I think I agree with John, that generally speaking it's better be a > > control function on the `KTable` itself, rather than on `Materialized`, > so > > fixing it via adding functions through `Materialized` seems not a natural > > approach either. > > > > > > Overall, I'm thinking maybe we should still use two stones rather than > one > > to kill these two birds, and probably for this KIP we just focus on 1) > > above. And for that I'd like to not expose the Materialized either for > > rationales that I've listed above. Instead, we just restrict KIP-307 to > NOT > > use the Joined.name for state store names and always use internal names > as > > well, which admittedly indeed leaves a hole of not being able to cover > all > > internal names here, but now I feel this `hole` may better be filled by, > > e.g. not creating changelog topics but just use the upstream to > > re-bootstrap the materialized store, more concretely: when materializing > > the store, try to piggy-back the changelog topic on an existing topic, > e.g. > > a) if the stream is coming directly from some source topic (including > > repartition topic), make that as changelog topic and if it is repartition > > topic change the retention / data purging policy necessarily as well; b) > if > > the stream is coming from some stateless operators, delegate that > stateless > > operator to the parent stream similar as a); if the stream is coming > from a > > stream-stream join which is the only stateful operator that can result > in a > > stream, consider merging the join into multi-way joins (yes, this is a > very > > hand-wavy thought, but the point here is that we do not try to tackle it > > now but leave it for a better solution :). > > > > > > Guozhang > > > > > > > > On Wed, Jun 19, 2019 at 11:41 AM John Roesler <j...@confluent.io> wrote: > > > > > Hi Bill, > > > > > > Thanks for the KIP! Awesome job catching this unexpected consequence > > > of the prior KIPs before it was released. > > > > > > The proposal looks good to me. On top of just fixing the problem, it > > > seems to address two other pain points: > > > * that naming a state store automatically causes it to become > queriable. > > > * that there's currently no way to configure the bytes store for join > > > windows. > > > > > > It's awesome that we can fix this issue and two others with one > feature. > > > > > > I'm wondering about a missing quadrant from the truth table involving > > > whether a Materialized is stored or not and querying is > > > enabled/disabled... What should be the behavior if there is no store > > > configured (e.g., if Materialized with only serdes) and querying is > > > enabled? > > > > > > It seems we have two choices: > > > 1. we can force creation of a state store in this case, so the store > > > can be used to serve the queries > > > 2. we can provide just a queriable view, basically letting IQ query > > > into the "KTableValueGetter", which would transparently construct the > > > query response by applying the operator logic to the upstream state if > > > the operator state isn't already stored. > > > > > > Offhand, it seems like the second is actually a pretty awesome > > > capability. But it might have an awkward interaction with the current > > > semantics. Presently, if I provide a Materialized.withName, it implies > > > that querying should be enabled AND that the view should actually be > > > stored in a state store. Under option 2 above, this behavior would > > > change to NOT provision a state store and instead just consult the > > > ValueGetter. To get back to the current behavior, users would have to > > > add a "bytes store supplier" to the Materialized to indicate that, > > > yes, they really want a state store there. > > > > > > Behavior changes are always kind of scary, but I think in this case, > > > it might actually be preferable. In the event where only the name is > > > provided, it means that people just wanted to make the operation > > > result queriable. If we automatically convert this to a non-stored > > > view, then simply upgrading results in the same observable behavior > > > and semantics, but a linear reduction in local storage requirements > > > and disk i/o, as well as a corresponding linear reduction in memory > > > usage both on and off heap. > > > > > > What do you think? > > > -John > > > > > > On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > > > > > > All, > > > > > > > > I'd like to start a discussion for adding a Materialized > configuration > > > > object to KStream.join for naming state stores involved in joins. > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join > > > > > > > > Your comments and suggestions are welcome. > > > > > > > > Thanks, > > > > Bill > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang