Thanks for the response, John.

> If I can offer my thoughts, it seems better to just document on the
> Stream join javadoc for the Materialized parameter that it will not
> make the join result queriable. I'm not opposed to the queriable flag
> in general, but introducing it is a much larger consideration that has
> previously derailed this KIP discussion. In the interest of just
> closing the gap and keeping the API change small, it seems better to
> just go with documentation for now.

I agree with your statement here.  IMHO the most important goal of this KIP
is to not breaking existing users and gain some consistency of the API.

I'll update the KIP accordingly.

-Bill

On Tue, Jul 16, 2019 at 11:55 AM John Roesler <j...@confluent.io> wrote:

> Hi Bill,
>
> Thanks for driving this KIP toward a conclusion. I'm on board with
> your decision.
>
> You didn't mention whether you're still proposing to add the
> "queriable" flag to the Materialized config object, or just document
> that a Stream join is never queriable. Both options have come up
> earlier in the discussion, so it would be good to pin this down.
>
> If I can offer my thoughts, it seems better to just document on the
> Stream join javadoc for the Materialized parameter that it will not
> make the join result queriable. I'm not opposed to the queriable flag
> in general, but introducing it is a much larger consideration that has
> previously derailed this KIP discussion. In the interest of just
> closing the gap and keeping the API change small, it seems better to
> just go with documentation for now.
>
> Thanks again,
> -John
>
> On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <bbej...@gmail.com> wrote:
> >
> > Thanks all for the great discussion so far.
> >
> > Everyone has made excellent points, and I appreciate the detail everyone
> > has put into their arguments.
> >
> > However, after carefully evaluating all the points made so far, creating
> an
> > overload with Materialized is still my #1 option.
> > My reasoning for saying so is two-fold:
> >
> >    1. It's a small change, and IMHO since it's consistent with our
> current
> >    API concerning state store usage, the cognitive load on users will be
> >    minimal.
> >    2. It achieves the most important goal of this KIP, namely to close
> the
> >    gap of naming state stores independently of the join operator name.
> >
> > Additionally, I agree with the points made by Matthias earlier (I realize
> > there is some overlap here).
> >
> > >  - the main purpose of this KIP is to close the naming gap what we
> achieve
> > >  - we can allow people to use the new in-memory store
> > >  - we allow people to enable/disable caching
> > >  - we unify the API
> > >  - we decouple querying from naming
> > >  - it's a small API change
> >
> > Although it's not a perfect solution,  IMHO the positives of using
> > Materialize far outweigh the negatives, and from what we've discussed so
> > far, anything we implement seems to involve an additional change down the
> > road.
> >
> > If others are still strongly opposed to using Materialized, my other
> > preferences would be
> >
> >    1. Add a "withStoreName" to Joined.  Although I agree with Guozhang
> that
> >    having a parameter that only applies to one use-case would be clumsy.
> >    2. Add a String overload for naming the store, but this would be my
> >    least favorite option as IMHO this seems to be a step backward from
> why we
> >    introduced configuration objects in the first place.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for the KIP Bill!
> > >
> > > Great discussion to far.
> > >
> > > About John's idea about querying upstream stores and don't materialize
> a
> > > store: I agree with Bill that this seems to be an orthogonal question,
> > > and it might be better to treat it as an independent optimization and
> > > exclude from this KIP.
> > >
> > > > What should be the behavior if there is no store
> > > > configured (e.g., if Materialized with only serdes) and querying is
> > > > enabled?
> > >
> > > IMHO, this could be an error case. If one wants to query a store, they
> > > need to provide a name -- if you don't know the name, how would you
> > > actually query the store (even if it would be possible to get the name
> > > from the `TopologyDescription`, it seems clumsy).
> > >
> > > If we don't want to throw an error, materializing seems to be the right
> > > option, to exclude "query optimization" from this KIP. I would be ok
> > > with this option, even if it's clumsy to get the name from
> > > `TopologyDescription`; hence, I would prefer to treat it as an error.
> > >
> > > > To get back to the current behavior, users would have to
> > > > add a "bytes store supplier" to the Materialized to indicate that,
> > > > yes, they really want a state store there.
> > >
> > > This sound like a quite subtle semantic difference on how to use the
> > > API. Might be hard to explain to users. I would prefer to not
> introduce it.
> > >
> > >
> > >
> > > About Guozhang's points:
> > >
> > > 1a) That is actually a good point. However, I believe we cannot get
> > > around this issue easily, and it seems ok to me, to expose the actual
> > > store type we are using. (More thoughts later.)
> > >
> > > 1b) I don't see an issue with allowing users to query all stores? What
> > > is the rational behind it? What do we gain by not allowing it?
> > >
> > > 2) While I understand what you are saying, we also want/need to have a
> > > way in the PAPI to allow users adding "internal/private" non-queryable
> > > stores to a topology. That's possible via
> > > `Materialized#withQueryingDisabled()`. We could also update
> > > `Topology#addStateStore(StoreBuilder, boolean isQueryable, String...)`
> > > to address this. Again, I agree with Bill that the current API is built
> > > in a certain way, and if we want to change it, it should be a separate
> > > KIP, as it seems to be an orthogonal concern.
> > >
> > > > Instead, we just restrict KIP-307 to NOT
> > > > use the Joined.name for state store names and always use internal
> names
> > > as
> > > > well, which admittedly indeed leaves a hole of not being able to
> cover
> > > all
> > > > internal names here
> > >
> > > I think it's important to close this gap. Naming entities seems to a
> > > binary feature: if there is a gap, the feature is more or less useless,
> > > rendering KIP-307 void.
> > >
> > >
> > >
> > > I like John's detailed list of required features and what
> > > Materialized/WindowByteStoreSuppliers offers. My take is, that adding
> > > Materialized including the required run-time checks is the best option
> > > we have, for the following reasons:
> > >
> > >  - the main purpose of this KIP is to close the naming gap what we
> achieve
> > >  - we can allow people to use the new in-memory store
> > >  - we allow people to enable/disable caching
> > >  - we unify the API
> > >  - we decouple querying from naming
> > >  - it's a small API change
> > >
> > > Adding an overload and only passing in a name, would address the main
> > > purpose of the KIP. However, it falls short on all the other "goodies".
> > > As you mentioned, passing in `Materialized` might not be perfect and
> > > maybe we need to deprecate is at some point; but this is also true for
> > > passing in just a name.
> > >
> > > I am also not convinced, that a `StreamJoinStore` would resolve all the
> > > issues. In the end, as long as we are using a `WindowedStore`
> > > internally, we need to expose this "implemenation detail" to users to
> > > allow them to plug in a custom store. Adding `Materialized` seem to be
> > > the best short-term fix from my point of view.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 6/27/19 9:56 AM, Guozhang Wang wrote:
> > > > Hi John,
> > > >
> > > > I actually feels better about a new interface but I'm not sure if we
> > > would
> > > > need the full configuration of store / log / cache, now or in the
> future
> > > > ever for stream-stream join.
> > > >
> > > > Right now I feel that 1) we want to improve our implementation of
> > > > stream-stream join, and potentially also allow users to customize
> this
> > > > implementation but with a more suitable interface than the current
> > > > WindowStore interface, how to do that is less clear and
> execution-wise
> > > it's
> > > > (arguably..) not urgent; 2) we want to close the last gap
> (Stream-stream
> > > > join) of allowing users to specify all internal names to help on
> backward
> > > > compatibility, which is urgent.
> > > >
> > > > Therefore if we want to unblock 2) from 1) in the near term, I feel
> > > > slightly inclined to just add overload functions that takes in a
> store
> > > name
> > > > for stream-stream joins only -- and admittedly, in the future this
> > > function
> > > > maybe deprecated -- i.e. if we have to do something that we "may
> regret"
> > > in
> > > > the future, I'd like to pick the least intrusive option.
> > > >
> > > > About `Joined#withStoreName`: since the Joined class itself is also
> used
> > > in
> > > > other join types, I feel less comfortable to have a
> > > `Joined#withStoreName`
> > > > which is only going to be used by stream-stream join. Or maybe I miss
> > > > something here about the "latter" case that you are referring to?
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Jun 24, 2019 at 12:16 PM John Roesler <j...@confluent.io>
> wrote:
> > > >
> > > >> Thanks Guozhang,
> > > >>
> > > >> Yep. Maybe we can consider just exactly what the join needs:
> > > >>
> > > >>> the WindowStore<Bytes, byte[]> itself is fine, if overly broad,
> > > >>> since the only two methods we need are `window.put(key, value,
> > > >>> context().timestamp())` and `WindowStoreIterator<V2> iter =
> > > >>> window.fetch(key, timeFrom, timeTo)`.
> > > >>
> > > >> One "middle ground" would be to extract _this_ into a new store
> > > >> interface, which only supports these API calls, like
> > > >> StreamJoinStore<K, V>. This would give us the latitude we need to
> > > >> efficiently support the exact operation without concerning ourselves
> > > >> with all the other things a WindowStore can do (which are
> unreachable
> > > >> for the join use case). It would also let us drop "store duplicates"
> > > >> from the main WindowStore interface, since it only exists to support
> > > >> the join use case.
> > > >>
> > > >> If we were to add a new StreamJoinStore interface, then it'd be
> > > >> straightforward how we could add also
> > > >> `Materialized.as(StreamJoinBytesStoreSupplier)` and use
> Materialized,
> > > >> or alternatively add the ability to set the bytes store on Joined.
> > > >>
> > > >> Personally, I'm kind of leaning toward the latter (and also doing
> > > >> `Joined#withStoreName`), since adding the new interface to
> > > >> Materialized then also pollutes the interface for its _actual_ use
> > > >> case of materializing a table view. Of course, to solve the
> immediate
> > > >> problem, all we need is the store name, but we might feel better
> about
> > > >> adding the store name to Joined if we _also_ feel like in the
> future,
> > > >> we would add store/log/cache configuration to Joined as well.
> > > >>
> > > >> -John
> > > >>
> > > >> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >>>
> > > >>> Hello John,
> > > >>>
> > > >>> My main concern is exactly the first point at the bottom of your
> > > analysis
> > > >>> here: "* configure the bytes store". I'm not sure if using a window
> > > bytes
> > > >>> store would be ideal for stream-stream windowed join; e.g. we could
> > > >>> consider two dimensional list sorted by timestamps and then by
> keys to
> > > do
> > > >>> the join, whereas a windowed bytes store is basically sorted by key
> > > >> first,
> > > >>> then by timestamp. If we expose the Materialized to let user pass
> in a
> > > >>> windowed bytes store, then we would need to change that if we want
> to
> > > >>> replace it with a different implementation interface.
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler <j...@confluent.io>
> > > wrote:
> > > >>>
> > > >>>> Hey Guozhang and Bill,
> > > >>>>
> > > >>>> For what it's worth, I agree with you both!
> > > >>>>
> > > >>>> I think it might help the discussion to look concretely at what
> > > >>>> Materialized does:
> > > >>>> * set a WindowBytesStoreSupplier
> > > >>>> * set a name
> > > >>>> * set the key/value serdes
> > > >>>> * disable/enable/configure change-logging
> > > >>>> * disable/enable caching
> > > >>>> * configure retention
> > > >>>>
> > > >>>> Further, looking into the WindowBytesStoreSupplier, the interface
> lets
> > > >> you:
> > > >>>> * get the segment interval
> > > >>>> * get the window size
> > > >>>> * get whether "duplicates" are enabled
> > > >>>> * get the retention period
> > > >>>> * (obviously) get a WindowStore<Bytes, byte[]>
> > > >>>>
> > > >>>> We know that Materialized isn't exactly what we need for stream
> joins,
> > > >>>> but we can see how close Materialized is to what we need. If it is
> > > >>>> close, maybe we can use it and document the gaps, and if it is not
> > > >>>> close, then maybe we should just add what we need to Joined.
> > > >>>> Stream Join's requirements for its stores:
> > > >>>> * a multimap store (i.e., it keeps duplicates) for storing general
> > > >>>> (not windowed) keyed records associated with their insertion
> time, and
> > > >>>> allows efficient time-bounded lookups and also efficient purges
> of old
> > > >>>> data.
> > > >>>> ** Note, a properly configured WindowBytesStoreSupplier satisfies
> this
> > > >>>> requirement, and the interface supports the queries we need to
> verify
> > > >>>> the configuration at run-time
> > > >>>> * set a name for the store
> > > >>>> * do _not_ set the serdes (they are already set in Joined)
> > > >>>> * logging could be configurable (set to enabled now)
> > > >>>> * caching could be configurable (set to enabled now)
> > > >>>> * do _not_ configure retention (determined by JoinWindows)
> > > >>>>
> > > >>>> So, out of six capabilities for Materialized, there are two we
> don't
> > > >>>> want (serdes and retention). These would become run-time checks
> if we
> > > >>>> use it.
> > > >>>>
> > > >>>> A third questionable capability is to provide a
> > > >>>> WindowBytesStoreSupplier. Looking at whether the
> > > >>>> WindowBytesStoreSupplier is the right interface for Stream Join:
> > > >>>> * configuring segment interval is fine
> > > >>>> * should _not_ configure window size (it's determined by
> JoinWindows)
> > > >>>> * duplicates _must_ be enabled
> > > >>>> * retention should be _at least_ windowSize + gracePeriod, but
> note
> > > >>>> that (unlike for Table window stores) there is no utility in
> having a
> > > >>>> longer retention time.
> > > >>>> * the WindowStore<Bytes, byte[]> itself is fine, if overly broad,
> > > >>>> since the only two methods we need are `window.put(key, value,
> > > >>>> context().timestamp())` and `WindowStoreIterator<V2> iter =
> > > >>>> window.fetch(key, timeFrom, timeTo)`.
> > > >>>>
> > > >>>> Thus, flattening out the overlap for WindowBytesStoreSupplier
> onto the
> > > >>>> overlap for Materialized, we have 9 capabilities total (note
> retention
> > > >>>> is duplicated), we have 4 that we don't want:
> > > >>>> * do _not_ set the serdes (they are already set in Joined)
> > > >>>> * do _not_ configure retention (determined by JoinWindows)
> > > >>>> * should _not_ configure window size (it's determined by
> JoinWindows)
> > > >>>> * duplicates _must_ be enabled
> > > >>>>
> > > >>>> These gaps would have to be covered with run-time checks if we
> re-use
> > > >>>> Materialized and WindowStoreBytesStoreSupplier both. Maybe this
> sounds
> > > >>>> bad, but consider the other side, that we get 5 new capabilities
> we
> > > >>>> don't require, but are still pretty nice:
> > > >>>> * configure the bytes store
> > > >>>> * set a name for the store
> > > >>>> * configure caching
> > > >>>> * configure logging
> > > >>>> * configure segment interval
> > > >>>>
> > > >>>> Not sure where this nets us out, but it's food for thought.
> > > >>>> -John
> > > >>>>
> > > >>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <wangg...@gmail.com
> >
> > > >> wrote:
> > > >>>>>
> > > >>>>> Hi Bill,
> > > >>>>>
> > > >>>>> I think by giving a Materialized param into stream-stream join,
> it's
> > > >> okay
> > > >>>>> (though still ideal) to say "we still would not expose the store
> for
> > > >>>>> queries", but it would sound a bit awkward to say "we would also
> > > >> ignore
> > > >>>>> whatever the passed in store supplier but just use our default
> ones"
> > > >> --
> > > >>>>> again the concern is that, if in the future we'd want to change
> the
> > > >>>> default
> > > >>>>> implementation of join algorithm which no longer rely on a window
> > > >> store
> > > >>>>> with deduping enabled, then we need to change this API again by
> > > >> changing
> > > >>>>> the store supplier type.
> > > >>>>>
> > > >>>>> If we do want to fill this hole for stream-stream join, I feel
> just
> > > >>>> adding
> > > >>>>> a String typed store-name would even be less future-intrusive if
> we
> > > >>>> expect
> > > >>>>> this parameter to be modified later.
> > > >>>>>
> > > >>>>> Does that makes sense?
> > > >>>>>
> > > >>>>>
> > > >>>>> Guozhang
> > > >>>>>
> > > >>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <bbej...@gmail.com>
> > > >> wrote:
> > > >>>>>
> > > >>>>>> Thanks for the comments John and Guozhang, I'll address each
> one of
> > > >>>> your
> > > >>>>>> comments in turn.
> > > >>>>>>
> > > >>>>>> John,
> > > >>>>>>
> > > >>>>>>> I'm wondering about a missing quadrant from the truth table
> > > >> involving
> > > >>>>>>> whether a Materialized is stored or not and querying is
> > > >>>>>>> enabled/disabled... What should be the behavior if there is no
> > > >> store
> > > >>>>>>> configured (e.g., if Materialized with only serdes) and
> querying
> > > >> is
> > > >>>>>> enabled?
> > > >>>>>>
> > > >>>>>>> It seems we have two choices:
> > > >>>>>>> 1. we can force creation of a state store in this case, so the
> > > >> store
> > > >>>>>>> can be used to serve the queries
> > > >>>>>>> 2. we can provide just a queriable view, basically letting IQ
> > > >> query
> > > >>>>>>> into the "KTableValueGetter", which would transparently
> > > >> construct the
> > > >>>>>>> query response by applying the operator logic to the upstream
> > > >> state
> > > >>>> if
> > > >>>>>>> the operator state isn't already stored.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> I agree with your assertion about a missing quadrant from the
> truth
> > > >>>> table.
> > > >>>>>> Additionally, I too like the concept of a queriable view.  But I
> > > >> think
> > > >>>> that
> > > >>>>>> goes a bit beyond the scope of this KIP and would like to pursue
> > > >> that
> > > >>>>>> feature as follow-on work.  Also thinking about this KIP some
> > > >> more, I'm
> > > >>>>>> thinking of the changes to Materialized might be a reach as
> well.
> > > >>>>>> Separating the naming from a store and its queryable state seems
> > > >> like a
> > > >>>>>> complex issue in and of itself and should be treated
> accordingly.
> > > >>>>>>
> > > >>>>>> So here's what I'm thinking now.  We add Materialzied to Join,
> but
> > > >> for
> > > >>>> now,
> > > >>>>>> we internally disable querying.  I know this breaks our current
> > > >>>> semantic
> > > >>>>>> approach, but I think it's crucial that we do two things in this
> > > >> KIP
> > > >>>>>>
> > > >>>>>>    1. Break the naming of the state stores from Joined to
> > > >>>> Materialized, so
> > > >>>>>>    the naming of state stores follows our current pattern and
> > > >> enables
> > > >>>>>> upgrades
> > > >>>>>>    from 2.3 to 2.4
> > > >>>>>>    2. Offer the ability to configure the state stores of the
> join,
> > > >> even
> > > >>>>>>    providing a different implementation (i.e. in-memory) if
> > > >> desired.
> > > >>>>>>
> > > >>>>>> With that in mind I'm considering changing the KIP to remove the
> > > >>>> changes to
> > > >>>>>> Materialized, and we document very clearly that by providing a
> > > >>>> Materialized
> > > >>>>>> object with a name is only for naming the state store, hence the
> > > >>>> changelog
> > > >>>>>> topics and any possible configurations of the store, but this
> store
> > > >>>> *will
> > > >>>>>> not be available for IQ.*
> > > >>>>>>
> > > >>>>>> WDYT?
> > > >>>>>>
> > > >>>>>> Guozhang,
> > > >>>>>>
> > > >>>>>>> 1. About not breaking compatibility of stream-stream join
> > > >>>> materialized
> > > >>>>>>> stores: I think this is a valid issue to tackle, but after
> > > >> thinking
> > > >>>> about
> > > >>>>>>> it once more I'm not sure if exposing Materialized would be a
> > > >> good
> > > >>>>>> solution
> > > >>>>>>> here. My rationles:
> > > >>>>>>>
> > > >>>>>>> 1.a For stream-stream join, our current usage of window-store
> is
> > > >> not
> > > >>>>>> ideal,
> > > >>>>>>> and we want to modify it in the near future to be more
> > > >> efficient. Not
> > > >>>>>>> allowing users to override such state store backend gives us
> such
> > > >>>> freedom
> > > >>>>>>> (which was also considered in the original DSL design), whereas
> > > >>>> getting a
> > > >>>>>>> Materialized<WindowStore> basically kicks out that freedom out
> > > >> of the
> > > >>>>>>> window.
> > > >>>>>>> 1.b For strema-stream join, in our original design we intend to
> > > >>>> "never"
> > > >>>>>>> want users to query the state, since it is just for buffering
> the
> > > >>>>>> upcoming
> > > >>>>>>> records from the stream. Now I know that some users may indeed
> > > >> want
> > > >>>> to
> > > >>>>>>> query it from the debugging perspective, but still I concerned
> > > >> about
> > > >>>>>>> whether leveraging IQ for debugging purposes would be the right
> > > >>>> solution
> > > >>>>>>> here. And adding Materialized object opens the door to let
> users
> > > >>>> query
> > > >>>>>>> about it (unless we did something intentionally to still
> forbids
> > > >> it),
> > > >>>>>> which
> > > >>>>>>> also restricts us in the future.
> > > >>>>>>>
> > > >>>>>>> 2. About the coupling between Materialized.name() and
> queryable:
> > > >>>> again I
> > > >>>>>>> think this is a valid issue. But I'm not sure if the current
> > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the best
> > > >>>> approach.
> > > >>>>>>> Here I think I agree with John, that generally speaking it's
> > > >> better
> > > >>>> be a
> > > >>>>>>> control function on the `KTable` itself, rather than on
> > > >>>> `Materialized`,
> > > >>>>>> so
> > > >>>>>>> fixing it via adding functions through `Materialized` seems
> not a
> > > >>>> natural
> > > >>>>>> approach either.
> > > >>>>>>
> > > >>>>>> I understand your thoughts here, and up to a point, I agree with
> > > >> you.
> > > >>>>>> But concerning not providing Materialized as it may restrict us
> in
> > > >> the
> > > >>>>>> future for delivering different implementations, I'm wondering
> if
> > > >> we
> > > >>>> are
> > > >>>>>> doing some premature optimization here.
> > > >>>>>> My rationale for saying so
> > > >>>>>>
> > > >>>>>>    1. I think the cost of not allowing the naming of state
> stores
> > > >> for
> > > >>>> joins
> > > >>>>>>    is too big of a gap to leave.   IMHO for joins to follow the
> > > >> current
> > > >>>>>>    pattern of using Materialized for naming state stores would
> be
> > > >> what
> > > >>>> most
> > > >>>>>>    users would expect to use.  As I said in my comments above, I
> > > >> think
> > > >>>> we
> > > >>>>>>    should *not include* the changes to Materialized and enforce
> > > >> named
> > > >>>>>>    stores for joins as unavailable for IQ.
> > > >>>>>>    2. We'll still have the join methods available without a
> > > >>>> Materialized
> > > >>>>>>    allowing us to do something different internally if a
> > > >> Materialized
> > > >>>> is
> > > >>>>>> not
> > > >>>>>>    provided.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> Overall, I'm thinking maybe we should still use two stones
> rather
> > > >>>> than
> > > >>>>>> one
> > > >>>>>>> to kill these two birds, and probably for this KIP we just
> focus
> > > >> on
> > > >>>> 1)
> > > >>>>>>> above. And for that I'd like to not expose the Materialized
> > > >> either
> > > >>>> for
> > > >>>>>>> rationales that I've listed above. Instead, we just restrict
> > > >> KIP-307
> > > >>>> to
> > > >>>>>> NOT
> > > >>>>>>> use the Joined.name for state store names and always use
> internal
> > > >>>> names
> > > >>>>>> as
> > > >>>>>>> well, which admittedly indeed leaves a hole of not being able
> to
> > > >>>> cover
> > > >>>>>> all
> > > >>>>>>> internal names here, but now I feel this `hole` may better be
> > > >> filled
> > > >>>> by,
> > > >>>>>>> e.g. not creating changelog topics but just use the upstream to
> > > >>>>>>> re-bootstrap the materialized store, more concretely: when
> > > >>>> materializing
> > > >>>>>>> the store, try to piggy-back the changelog topic on an existing
> > > >>>> topic,
> > > >>>>>> e.g.
> > > >>>>>>> a) if the stream is coming directly from some source topic
> > > >> (including
> > > >>>>>>> repartition topic), make that as changelog topic and if it is
> > > >>>> repartition
> > > >>>>>>> topic change the retention / data purging policy necessarily as
> > > >>>> well; b)
> > > >>>>>> if
> > > >>>>>>> the stream is coming from some stateless operators, delegate
> that
> > > >>>>>> stateless
> > > >>>>>>> operator to the parent stream similar as a); if the stream is
> > > >> coming
> > > >>>> from
> > > >>>>>> a
> > > >>>>>>> stream-stream join which is the only stateful operator that can
> > > >>>> result in
> > > >>>>>> a
> > > >>>>>>> stream, consider merging the join into multi-way joins (yes,
> > > >> this is
> > > >>>> a
> > > >>>>>> very
> > > >>>>>>> hand-wavy thought, but the point here is that we do not try to
> > > >>>> tackle it
> > > >>>>>>> now but leave it for a better solution :).
> > > >>>>>>
> > > >>>>>> I really like this idea!  I agree with you in that this approach
> > > >> to too
> > > >>>>>> much for adding in this KIP, but we could pick it up later and
> > > >>>> leverage the
> > > >>>>>> Optimization framework to accomplish this re-use.
> > > >>>>>> Again, while I agree we should break the naming of join state
> > > >> stores
> > > >>>> from
> > > >>>>>> KIP-307, IMHO it's something we should fix now as it will be the
> > > >> last
> > > >>>> piece
> > > >>>>>> we can provide to give users the ability to completely make
> their
> > > >>>>>> topologies "upgrade proof" when adding additional operations.
> > > >>>>>>
> > > >>>>>> Thanks again to both of you for comments and I look forward to
> > > >> hearing
> > > >>>> back
> > > >>>>>> from you.
> > > >>>>>>
> > > >>>>>> Regards,
> > > >>>>>> Bill
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <
> wangg...@gmail.com>
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hello Bill,
> > > >>>>>>>
> > > >>>>>>> Thanks for the KIP. Glad to see that we can likely shooting two
> > > >> birds
> > > >>>>>> with
> > > >>>>>>> one stone. I have some concerns though about those "two birds"
> > > >>>>>> themselves:
> > > >>>>>>>
> > > >>>>>>> 1. About not breaking compatibility of stream-stream join
> > > >>>> materialized
> > > >>>>>>> stores: I think this is a valid issue to tackle, but after
> > > >> thinking
> > > >>>> about
> > > >>>>>>> it once more I'm not sure if exposing Materialized would be a
> > > >> good
> > > >>>>>> solution
> > > >>>>>>> here. My rationles:
> > > >>>>>>>
> > > >>>>>>> 1.a For stream-stream join, our current usage of window-store
> is
> > > >> not
> > > >>>>>> ideal,
> > > >>>>>>> and we want to modify it in the near future to be more
> > > >> efficient. Not
> > > >>>>>>> allowing users to override such state store backend gives us
> such
> > > >>>> freedom
> > > >>>>>>> (which was also considered in the original DSL design), whereas
> > > >>>> getting a
> > > >>>>>>> Materialized<WindowStore> basically kicks out that freedom out
> > > >> of the
> > > >>>>>>> window.
> > > >>>>>>> 1.b For strema-stream join, in our original design we intend to
> > > >>>> "never"
> > > >>>>>>> want users to query the state, since it is just for buffering
> the
> > > >>>>>> upcoming
> > > >>>>>>> records from the stream. Now I know that some users may indeed
> > > >> want
> > > >>>> to
> > > >>>>>>> query it from the debugging perspective, but still I concerned
> > > >> about
> > > >>>>>>> whether leveraging IQ for debugging purposes would be the right
> > > >>>> solution
> > > >>>>>>> here. And adding Materialized object opens the door to let
> users
> > > >>>> query
> > > >>>>>>> about it (unless we did something intentionally to still
> forbids
> > > >> it),
> > > >>>>>> which
> > > >>>>>>> also restricts us in the future.
> > > >>>>>>>
> > > >>>>>>> 2. About the coupling between Materialized.name() and
> queryable:
> > > >>>> again I
> > > >>>>>>> think this is a valid issue. But I'm not sure if the current
> > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the best
> > > >>>> approach.
> > > >>>>>>> Here I think I agree with John, that generally speaking it's
> > > >> better
> > > >>>> be a
> > > >>>>>>> control function on the `KTable` itself, rather than on
> > > >>>> `Materialized`,
> > > >>>>>> so
> > > >>>>>>> fixing it via adding functions through `Materialized` seems
> not a
> > > >>>> natural
> > > >>>>>>> approach either.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Overall, I'm thinking maybe we should still use two stones
> rather
> > > >>>> than
> > > >>>>>> one
> > > >>>>>>> to kill these two birds, and probably for this KIP we just
> focus
> > > >> on
> > > >>>> 1)
> > > >>>>>>> above. And for that I'd like to not expose the Materialized
> > > >> either
> > > >>>> for
> > > >>>>>>> rationales that I've listed above. Instead, we just restrict
> > > >> KIP-307
> > > >>>> to
> > > >>>>>> NOT
> > > >>>>>>> use the Joined.name for state store names and always use
> internal
> > > >>>> names
> > > >>>>>> as
> > > >>>>>>> well, which admittedly indeed leaves a hole of not being able
> to
> > > >>>> cover
> > > >>>>>> all
> > > >>>>>>> internal names here, but now I feel this `hole` may better be
> > > >> filled
> > > >>>> by,
> > > >>>>>>> e.g. not creating changelog topics but just use the upstream to
> > > >>>>>>> re-bootstrap the materialized store, more concretely: when
> > > >>>> materializing
> > > >>>>>>> the store, try to piggy-back the changelog topic on an existing
> > > >>>> topic,
> > > >>>>>> e.g.
> > > >>>>>>> a) if the stream is coming directly from some source topic
> > > >> (including
> > > >>>>>>> repartition topic), make that as changelog topic and if it is
> > > >>>> repartition
> > > >>>>>>> topic change the retention / data purging policy necessarily as
> > > >>>> well; b)
> > > >>>>>> if
> > > >>>>>>> the stream is coming from some stateless operators, delegate
> that
> > > >>>>>> stateless
> > > >>>>>>> operator to the parent stream similar as a); if the stream is
> > > >> coming
> > > >>>>>> from a
> > > >>>>>>> stream-stream join which is the only stateful operator that can
> > > >>>> result
> > > >>>>>> in a
> > > >>>>>>> stream, consider merging the join into multi-way joins (yes,
> > > >> this is
> > > >>>> a
> > > >>>>>> very
> > > >>>>>>> hand-wavy thought, but the point here is that we do not try to
> > > >>>> tackle it
> > > >>>>>>> now but leave it for a better solution :).
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Guozhang
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler <
> j...@confluent.io
> > > >>>
> > > >>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi Bill,
> > > >>>>>>>>
> > > >>>>>>>> Thanks for the KIP! Awesome job catching this unexpected
> > > >>>> consequence
> > > >>>>>>>> of the prior KIPs before it was released.
> > > >>>>>>>>
> > > >>>>>>>> The proposal looks good to me. On top of just fixing the
> > > >> problem,
> > > >>>> it
> > > >>>>>>>> seems to address two other pain points:
> > > >>>>>>>> * that naming a state store automatically causes it to become
> > > >>>>>> queriable.
> > > >>>>>>>> * that there's currently no way to configure the bytes store
> > > >> for
> > > >>>> join
> > > >>>>>>>> windows.
> > > >>>>>>>>
> > > >>>>>>>> It's awesome that we can fix this issue and two others with
> one
> > > >>>>>> feature.
> > > >>>>>>>>
> > > >>>>>>>> I'm wondering about a missing quadrant from the truth table
> > > >>>> involving
> > > >>>>>>>> whether a Materialized is stored or not and querying is
> > > >>>>>>>> enabled/disabled... What should be the behavior if there is no
> > > >>>> store
> > > >>>>>>>> configured (e.g., if Materialized with only serdes) and
> > > >> querying is
> > > >>>>>>>> enabled?
> > > >>>>>>>>
> > > >>>>>>>> It seems we have two choices:
> > > >>>>>>>> 1. we can force creation of a state store in this case, so the
> > > >>>> store
> > > >>>>>>>> can be used to serve the queries
> > > >>>>>>>> 2. we can provide just a queriable view, basically letting IQ
> > > >> query
> > > >>>>>>>> into the "KTableValueGetter", which would transparently
> > > >> construct
> > > >>>> the
> > > >>>>>>>> query response by applying the operator logic to the upstream
> > > >>>> state if
> > > >>>>>>>> the operator state isn't already stored.
> > > >>>>>>>>
> > > >>>>>>>> Offhand, it seems like the second is actually a pretty awesome
> > > >>>>>>>> capability. But it might have an awkward interaction with the
> > > >>>> current
> > > >>>>>>>> semantics. Presently, if I provide a Materialized.withName, it
> > > >>>> implies
> > > >>>>>>>> that querying should be enabled AND that the view should
> > > >> actually
> > > >>>> be
> > > >>>>>>>> stored in a state store. Under option 2 above, this behavior
> > > >> would
> > > >>>>>>>> change to NOT provision a state store and instead just consult
> > > >> the
> > > >>>>>>>> ValueGetter. To get back to the current behavior, users would
> > > >> have
> > > >>>> to
> > > >>>>>>>> add a "bytes store supplier" to the Materialized to indicate
> > > >> that,
> > > >>>>>>>> yes, they really want a state store there.
> > > >>>>>>>>
> > > >>>>>>>> Behavior changes are always kind of scary, but I think in this
> > > >>>> case,
> > > >>>>>>>> it might actually be preferable. In the event where only the
> > > >> name
> > > >>>> is
> > > >>>>>>>> provided, it means that people just wanted to make the
> > > >> operation
> > > >>>>>>>> result queriable. If we automatically convert this to a
> > > >> non-stored
> > > >>>>>>>> view, then simply upgrading results in the same observable
> > > >> behavior
> > > >>>>>>>> and semantics, but a linear reduction in local storage
> > > >> requirements
> > > >>>>>>>> and disk i/o, as well as a corresponding linear reduction in
> > > >> memory
> > > >>>>>>>> usage both on and off heap.
> > > >>>>>>>>
> > > >>>>>>>> What do you think?
> > > >>>>>>>> -John
> > > >>>>>>>>
> > > >>>>>>>> On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <
> bbej...@gmail.com
> > > >>>
> > > >>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> All,
> > > >>>>>>>>>
> > > >>>>>>>>> I'd like to start a discussion for adding a Materialized
> > > >>>>>> configuration
> > > >>>>>>>>> object to KStream.join for naming state stores involved in
> > > >> joins.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
> > > >>>>>>>>>
> > > >>>>>>>>> Your comments and suggestions are welcome.
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks,
> > > >>>>>>>>> Bill
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>> -- Guozhang
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> --
> > > >>>>> -- Guozhang
> > > >>>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> -- Guozhang
> > > >>
> > > >
> > > >
> > >
> > >
>

Reply via email to