Hi Aishwarya,

On Tue, Sep 17, 2019 at 1:41 PM aishwarya kumar <ash26...@gmail.com> wrote:

> Will this be applicable to Kstream-Ktable joins as well? Or do users always
> materialize these joins explicitly?
>

No, this change applies to KStream-KStream joins only.  With KStream-KTable
joins KafkaStreams has materialized the KTable already, and we don't need
to do anything with the KStream instance in this case.


> I'm not sure if its even necessary (or makes sense), just trying to
> understand why the change is applicable to Kstream joins only?
>

The full details are in the KIP, but in a nutshell, we needed to break the
naming of the StateStore from `Joined.withName` and provide users a way to
name the StateStore explicitly.  While making the changes, we realized it
would be beneficial to give users the ability to use different WindowStore
implementations.  The most likely reason to use different WindowStores
would be to enable in-memory joins.


> Best,
> Aishwarya
>

Regards,
Bill


>
> On Tue, Sep 17, 2019 at 4:05 PM Bill Bejeck <bbej...@gmail.com> wrote:
>
> > Guozhang,
> >
> > Thanks for the comments.
> >
> > Yes, you are correct in your assessment regarding names, *if* the users
> > provide their own StoreSuppliers  When constructing as StoreSupplier, the
> > name can't be null, and additionally, there is no way to update the name.
> > Further downstream, the underlying StateStore instances use the provided
> > name for registration so they must be unique.  If users don't provide
> > distinct names for the StoreSuppliers, KafkaStreams will thow a
> > StreamsException when building the topology.
> >
> > Thanks,
> > Bill
> >
> >
> >
> > On Tue, Sep 17, 2019 at 9:39 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Bill,
> > >
> > > Thanks for the updated KIP. I made a pass on the StreamJoined section.
> > Just
> > > a quick question from user's perspective: if a user wants to provide a
> > > customized store-supplier, she is forced to also provide a name via the
> > > store-supplier. So there's no way to say "I want to provide my own
> store
> > > engine but let the library decide its name", is that right?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck <bbej...@gmail.com> wrote:
> > >
> > > > Bumping this discussion as we need to re-vote before the KIP
> deadline.
> > > >
> > > > On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbej...@gmail.com>
> > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > While working on the implementation of KIP-479, some issues came to
> > > light
> > > > > that the KIP as written won't work.  I have updated the KIP with a
> > > > solution
> > > > > I believe will solve the original problem as well as address the
> > > > > impediment to the initial approach.
> > > > >
> > > > > This update is a significant change, so please review the updated
> KIP
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> > > > and
> > > > > provide feedback.  After we conclude the discussion, there will be
> a
> > > > > re-vote.
> > > > >
> > > > > Thanks!
> > > > > Bill
> > > > >
> > > > > On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Hi Bill, thanks for your explanations. I'm on board with your
> > decision
> > > > >> too.
> > > > >>
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbej...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Thanks for the response, John.
> > > > >> >
> > > > >> > > If I can offer my thoughts, it seems better to just document
> on
> > > the
> > > > >> > > Stream join javadoc for the Materialized parameter that it
> will
> > > not
> > > > >> > > make the join result queriable. I'm not opposed to the
> queriable
> > > > flag
> > > > >> > > in general, but introducing it is a much larger consideration
> > that
> > > > has
> > > > >> > > previously derailed this KIP discussion. In the interest of
> just
> > > > >> > > closing the gap and keeping the API change small, it seems
> > better
> > > to
> > > > >> > > just go with documentation for now.
> > > > >> >
> > > > >> > I agree with your statement here.  IMHO the most important goal
> of
> > > > this
> > > > >> KIP
> > > > >> > is to not breaking existing users and gain some consistency of
> the
> > > > API.
> > > > >> >
> > > > >> > I'll update the KIP accordingly.
> > > > >> >
> > > > >> > -Bill
> > > > >> >
> > > > >> > On Tue, Jul 16, 2019 at 11:55 AM John Roesler <
> j...@confluent.io>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Hi Bill,
> > > > >> > >
> > > > >> > > Thanks for driving this KIP toward a conclusion. I'm on board
> > with
> > > > >> > > your decision.
> > > > >> > >
> > > > >> > > You didn't mention whether you're still proposing to add the
> > > > >> > > "queriable" flag to the Materialized config object, or just
> > > document
> > > > >> > > that a Stream join is never queriable. Both options have come
> up
> > > > >> > > earlier in the discussion, so it would be good to pin this
> down.
> > > > >> > >
> > > > >> > > If I can offer my thoughts, it seems better to just document
> on
> > > the
> > > > >> > > Stream join javadoc for the Materialized parameter that it
> will
> > > not
> > > > >> > > make the join result queriable. I'm not opposed to the
> queriable
> > > > flag
> > > > >> > > in general, but introducing it is a much larger consideration
> > that
> > > > has
> > > > >> > > previously derailed this KIP discussion. In the interest of
> just
> > > > >> > > closing the gap and keeping the API change small, it seems
> > better
> > > to
> > > > >> > > just go with documentation for now.
> > > > >> > >
> > > > >> > > Thanks again,
> > > > >> > > -John
> > > > >> > >
> > > > >> > > On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <
> bbej...@gmail.com>
> > > > >> wrote:
> > > > >> > > >
> > > > >> > > > Thanks all for the great discussion so far.
> > > > >> > > >
> > > > >> > > > Everyone has made excellent points, and I appreciate the
> > detail
> > > > >> > everyone
> > > > >> > > > has put into their arguments.
> > > > >> > > >
> > > > >> > > > However, after carefully evaluating all the points made so
> > far,
> > > > >> > creating
> > > > >> > > an
> > > > >> > > > overload with Materialized is still my #1 option.
> > > > >> > > > My reasoning for saying so is two-fold:
> > > > >> > > >
> > > > >> > > >    1. It's a small change, and IMHO since it's consistent
> with
> > > our
> > > > >> > > current
> > > > >> > > >    API concerning state store usage, the cognitive load on
> > users
> > > > >> will
> > > > >> > be
> > > > >> > > >    minimal.
> > > > >> > > >    2. It achieves the most important goal of this KIP,
> namely
> > to
> > > > >> close
> > > > >> > > the
> > > > >> > > >    gap of naming state stores independently of the join
> > operator
> > > > >> name.
> > > > >> > > >
> > > > >> > > > Additionally, I agree with the points made by Matthias
> earlier
> > > (I
> > > > >> > realize
> > > > >> > > > there is some overlap here).
> > > > >> > > >
> > > > >> > > > >  - the main purpose of this KIP is to close the naming gap
> > > what
> > > > we
> > > > >> > > achieve
> > > > >> > > > >  - we can allow people to use the new in-memory store
> > > > >> > > > >  - we allow people to enable/disable caching
> > > > >> > > > >  - we unify the API
> > > > >> > > > >  - we decouple querying from naming
> > > > >> > > > >  - it's a small API change
> > > > >> > > >
> > > > >> > > > Although it's not a perfect solution,  IMHO the positives of
> > > using
> > > > >> > > > Materialize far outweigh the negatives, and from what we've
> > > > >> discussed
> > > > >> > so
> > > > >> > > > far, anything we implement seems to involve an additional
> > change
> > > > >> down
> > > > >> > the
> > > > >> > > > road.
> > > > >> > > >
> > > > >> > > > If others are still strongly opposed to using Materialized,
> my
> > > > other
> > > > >> > > > preferences would be
> > > > >> > > >
> > > > >> > > >    1. Add a "withStoreName" to Joined.  Although I agree
> with
> > > > >> Guozhang
> > > > >> > > that
> > > > >> > > >    having a parameter that only applies to one use-case
> would
> > be
> > > > >> > clumsy.
> > > > >> > > >    2. Add a String overload for naming the store, but this
> > would
> > > > be
> > > > >> my
> > > > >> > > >    least favorite option as IMHO this seems to be a step
> > > backward
> > > > >> from
> > > > >> > > why we
> > > > >> > > >    introduced configuration objects in the first place.
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Bill
> > > > >> > > >
> > > > >> > > > On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax <
> > > > >> matth...@confluent.io
> > > > >> > >
> > > > >> > > > wrote:
> > > > >> > > >
> > > > >> > > > > Thanks for the KIP Bill!
> > > > >> > > > >
> > > > >> > > > > Great discussion to far.
> > > > >> > > > >
> > > > >> > > > > About John's idea about querying upstream stores and don't
> > > > >> > materialize
> > > > >> > > a
> > > > >> > > > > store: I agree with Bill that this seems to be an
> orthogonal
> > > > >> > question,
> > > > >> > > > > and it might be better to treat it as an independent
> > > > optimization
> > > > >> and
> > > > >> > > > > exclude from this KIP.
> > > > >> > > > >
> > > > >> > > > > > What should be the behavior if there is no store
> > > > >> > > > > > configured (e.g., if Materialized with only serdes) and
> > > > >> querying is
> > > > >> > > > > > enabled?
> > > > >> > > > >
> > > > >> > > > > IMHO, this could be an error case. If one wants to query a
> > > > store,
> > > > >> > they
> > > > >> > > > > need to provide a name -- if you don't know the name, how
> > > would
> > > > >> you
> > > > >> > > > > actually query the store (even if it would be possible to
> > get
> > > > the
> > > > >> > name
> > > > >> > > > > from the `TopologyDescription`, it seems clumsy).
> > > > >> > > > >
> > > > >> > > > > If we don't want to throw an error, materializing seems to
> > be
> > > > the
> > > > >> > right
> > > > >> > > > > option, to exclude "query optimization" from this KIP. I
> > would
> > > > be
> > > > >> ok
> > > > >> > > > > with this option, even if it's clumsy to get the name from
> > > > >> > > > > `TopologyDescription`; hence, I would prefer to treat it
> as
> > an
> > > > >> error.
> > > > >> > > > >
> > > > >> > > > > > To get back to the current behavior, users would have to
> > > > >> > > > > > add a "bytes store supplier" to the Materialized to
> > indicate
> > > > >> that,
> > > > >> > > > > > yes, they really want a state store there.
> > > > >> > > > >
> > > > >> > > > > This sound like a quite subtle semantic difference on how
> to
> > > use
> > > > >> the
> > > > >> > > > > API. Might be hard to explain to users. I would prefer to
> > not
> > > > >> > > introduce it.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > About Guozhang's points:
> > > > >> > > > >
> > > > >> > > > > 1a) That is actually a good point. However, I believe we
> > > cannot
> > > > >> get
> > > > >> > > > > around this issue easily, and it seems ok to me, to expose
> > the
> > > > >> actual
> > > > >> > > > > store type we are using. (More thoughts later.)
> > > > >> > > > >
> > > > >> > > > > 1b) I don't see an issue with allowing users to query all
> > > > stores?
> > > > >> > What
> > > > >> > > > > is the rational behind it? What do we gain by not allowing
> > it?
> > > > >> > > > >
> > > > >> > > > > 2) While I understand what you are saying, we also
> want/need
> > > to
> > > > >> have
> > > > >> > a
> > > > >> > > > > way in the PAPI to allow users adding "internal/private"
> > > > >> > non-queryable
> > > > >> > > > > stores to a topology. That's possible via
> > > > >> > > > > `Materialized#withQueryingDisabled()`. We could also
> update
> > > > >> > > > > `Topology#addStateStore(StoreBuilder, boolean isQueryable,
> > > > >> > String...)`
> > > > >> > > > > to address this. Again, I agree with Bill that the current
> > API
> > > > is
> > > > >> > built
> > > > >> > > > > in a certain way, and if we want to change it, it should
> be
> > a
> > > > >> > separate
> > > > >> > > > > KIP, as it seems to be an orthogonal concern.
> > > > >> > > > >
> > > > >> > > > > > Instead, we just restrict KIP-307 to NOT
> > > > >> > > > > > use the Joined.name for state store names and always use
> > > > >> internal
> > > > >> > > names
> > > > >> > > > > as
> > > > >> > > > > > well, which admittedly indeed leaves a hole of not being
> > > able
> > > > to
> > > > >> > > cover
> > > > >> > > > > all
> > > > >> > > > > > internal names here
> > > > >> > > > >
> > > > >> > > > > I think it's important to close this gap. Naming entities
> > > seems
> > > > >> to a
> > > > >> > > > > binary feature: if there is a gap, the feature is more or
> > less
> > > > >> > useless,
> > > > >> > > > > rendering KIP-307 void.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > I like John's detailed list of required features and what
> > > > >> > > > > Materialized/WindowByteStoreSuppliers offers. My take is,
> > that
> > > > >> adding
> > > > >> > > > > Materialized including the required run-time checks is the
> > > best
> > > > >> > option
> > > > >> > > > > we have, for the following reasons:
> > > > >> > > > >
> > > > >> > > > >  - the main purpose of this KIP is to close the naming gap
> > > what
> > > > we
> > > > >> > > achieve
> > > > >> > > > >  - we can allow people to use the new in-memory store
> > > > >> > > > >  - we allow people to enable/disable caching
> > > > >> > > > >  - we unify the API
> > > > >> > > > >  - we decouple querying from naming
> > > > >> > > > >  - it's a small API change
> > > > >> > > > >
> > > > >> > > > > Adding an overload and only passing in a name, would
> address
> > > the
> > > > >> main
> > > > >> > > > > purpose of the KIP. However, it falls short on all the
> other
> > > > >> > "goodies".
> > > > >> > > > > As you mentioned, passing in `Materialized` might not be
> > > perfect
> > > > >> and
> > > > >> > > > > maybe we need to deprecate is at some point; but this is
> > also
> > > > true
> > > > >> > for
> > > > >> > > > > passing in just a name.
> > > > >> > > > >
> > > > >> > > > > I am also not convinced, that a `StreamJoinStore` would
> > > resolve
> > > > >> all
> > > > >> > the
> > > > >> > > > > issues. In the end, as long as we are using a
> > `WindowedStore`
> > > > >> > > > > internally, we need to expose this "implemenation detail"
> to
> > > > >> users to
> > > > >> > > > > allow them to plug in a custom store. Adding
> `Materialized`
> > > seem
> > > > >> to
> > > > >> > be
> > > > >> > > > > the best short-term fix from my point of view.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > -Matthias
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On 6/27/19 9:56 AM, Guozhang Wang wrote:
> > > > >> > > > > > Hi John,
> > > > >> > > > > >
> > > > >> > > > > > I actually feels better about a new interface but I'm
> not
> > > sure
> > > > >> if
> > > > >> > we
> > > > >> > > > > would
> > > > >> > > > > > need the full configuration of store / log / cache, now
> or
> > > in
> > > > >> the
> > > > >> > > future
> > > > >> > > > > > ever for stream-stream join.
> > > > >> > > > > >
> > > > >> > > > > > Right now I feel that 1) we want to improve our
> > > implementation
> > > > >> of
> > > > >> > > > > > stream-stream join, and potentially also allow users to
> > > > >> customize
> > > > >> > > this
> > > > >> > > > > > implementation but with a more suitable interface than
> the
> > > > >> current
> > > > >> > > > > > WindowStore interface, how to do that is less clear and
> > > > >> > > execution-wise
> > > > >> > > > > it's
> > > > >> > > > > > (arguably..) not urgent; 2) we want to close the last
> gap
> > > > >> > > (Stream-stream
> > > > >> > > > > > join) of allowing users to specify all internal names to
> > > help
> > > > on
> > > > >> > > backward
> > > > >> > > > > > compatibility, which is urgent.
> > > > >> > > > > >
> > > > >> > > > > > Therefore if we want to unblock 2) from 1) in the near
> > > term, I
> > > > >> feel
> > > > >> > > > > > slightly inclined to just add overload functions that
> > takes
> > > > in a
> > > > >> > > store
> > > > >> > > > > name
> > > > >> > > > > > for stream-stream joins only -- and admittedly, in the
> > > future
> > > > >> this
> > > > >> > > > > function
> > > > >> > > > > > maybe deprecated -- i.e. if we have to do something that
> > we
> > > > "may
> > > > >> > > regret"
> > > > >> > > > > in
> > > > >> > > > > > the future, I'd like to pick the least intrusive option.
> > > > >> > > > > >
> > > > >> > > > > > About `Joined#withStoreName`: since the Joined class
> > itself
> > > is
> > > > >> also
> > > > >> > > used
> > > > >> > > > > in
> > > > >> > > > > > other join types, I feel less comfortable to have a
> > > > >> > > > > `Joined#withStoreName`
> > > > >> > > > > > which is only going to be used by stream-stream join. Or
> > > > maybe I
> > > > >> > miss
> > > > >> > > > > > something here about the "latter" case that you are
> > > referring
> > > > >> to?
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > Guozhang
> > > > >> > > > > >
> > > > >> > > > > > On Mon, Jun 24, 2019 at 12:16 PM John Roesler <
> > > > >> j...@confluent.io>
> > > > >> > > wrote:
> > > > >> > > > > >
> > > > >> > > > > >> Thanks Guozhang,
> > > > >> > > > > >>
> > > > >> > > > > >> Yep. Maybe we can consider just exactly what the join
> > > needs:
> > > > >> > > > > >>
> > > > >> > > > > >>> the WindowStore<Bytes, byte[]> itself is fine, if
> overly
> > > > >> broad,
> > > > >> > > > > >>> since the only two methods we need are
> `window.put(key,
> > > > value,
> > > > >> > > > > >>> context().timestamp())` and `WindowStoreIterator<V2>
> > iter
> > > =
> > > > >> > > > > >>> window.fetch(key, timeFrom, timeTo)`.
> > > > >> > > > > >>
> > > > >> > > > > >> One "middle ground" would be to extract _this_ into a
> new
> > > > store
> > > > >> > > > > >> interface, which only supports these API calls, like
> > > > >> > > > > >> StreamJoinStore<K, V>. This would give us the latitude
> we
> > > > need
> > > > >> to
> > > > >> > > > > >> efficiently support the exact operation without
> > concerning
> > > > >> > ourselves
> > > > >> > > > > >> with all the other things a WindowStore can do (which
> are
> > > > >> > > unreachable
> > > > >> > > > > >> for the join use case). It would also let us drop
> "store
> > > > >> > duplicates"
> > > > >> > > > > >> from the main WindowStore interface, since it only
> exists
> > > to
> > > > >> > support
> > > > >> > > > > >> the join use case.
> > > > >> > > > > >>
> > > > >> > > > > >> If we were to add a new StreamJoinStore interface, then
> > > it'd
> > > > be
> > > > >> > > > > >> straightforward how we could add also
> > > > >> > > > > >> `Materialized.as(StreamJoinBytesStoreSupplier)` and use
> > > > >> > > Materialized,
> > > > >> > > > > >> or alternatively add the ability to set the bytes store
> > on
> > > > >> Joined.
> > > > >> > > > > >>
> > > > >> > > > > >> Personally, I'm kind of leaning toward the latter (and
> > also
> > > > >> doing
> > > > >> > > > > >> `Joined#withStoreName`), since adding the new interface
> > to
> > > > >> > > > > >> Materialized then also pollutes the interface for its
> > > > _actual_
> > > > >> use
> > > > >> > > > > >> case of materializing a table view. Of course, to solve
> > the
> > > > >> > > immediate
> > > > >> > > > > >> problem, all we need is the store name, but we might
> feel
> > > > >> better
> > > > >> > > about
> > > > >> > > > > >> adding the store name to Joined if we _also_ feel like
> in
> > > the
> > > > >> > > future,
> > > > >> > > > > >> we would add store/log/cache configuration to Joined as
> > > well.
> > > > >> > > > > >>
> > > > >> > > > > >> -John
> > > > >> > > > > >>
> > > > >> > > > > >> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang <
> > > > >> > wangg...@gmail.com>
> > > > >> > > > > wrote:
> > > > >> > > > > >>>
> > > > >> > > > > >>> Hello John,
> > > > >> > > > > >>>
> > > > >> > > > > >>> My main concern is exactly the first point at the
> bottom
> > > of
> > > > >> your
> > > > >> > > > > analysis
> > > > >> > > > > >>> here: "* configure the bytes store". I'm not sure if
> > > using a
> > > > >> > window
> > > > >> > > > > bytes
> > > > >> > > > > >>> store would be ideal for stream-stream windowed join;
> > e.g.
> > > > we
> > > > >> > could
> > > > >> > > > > >>> consider two dimensional list sorted by timestamps and
> > > then
> > > > by
> > > > >> > > keys to
> > > > >> > > > > do
> > > > >> > > > > >>> the join, whereas a windowed bytes store is basically
> > > sorted
> > > > >> by
> > > > >> > key
> > > > >> > > > > >> first,
> > > > >> > > > > >>> then by timestamp. If we expose the Materialized to
> let
> > > user
> > > > >> pass
> > > > >> > > in a
> > > > >> > > > > >>> windowed bytes store, then we would need to change
> that
> > if
> > > > we
> > > > >> > want
> > > > >> > > to
> > > > >> > > > > >>> replace it with a different implementation interface.
> > > > >> > > > > >>>
> > > > >> > > > > >>>
> > > > >> > > > > >>> Guozhang
> > > > >> > > > > >>>
> > > > >> > > > > >>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler <
> > > > >> j...@confluent.io>
> > > > >> > > > > wrote:
> > > > >> > > > > >>>
> > > > >> > > > > >>>> Hey Guozhang and Bill,
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> For what it's worth, I agree with you both!
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> I think it might help the discussion to look
> concretely
> > > at
> > > > >> what
> > > > >> > > > > >>>> Materialized does:
> > > > >> > > > > >>>> * set a WindowBytesStoreSupplier
> > > > >> > > > > >>>> * set a name
> > > > >> > > > > >>>> * set the key/value serdes
> > > > >> > > > > >>>> * disable/enable/configure change-logging
> > > > >> > > > > >>>> * disable/enable caching
> > > > >> > > > > >>>> * configure retention
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> Further, looking into the WindowBytesStoreSupplier,
> the
> > > > >> > interface
> > > > >> > > lets
> > > > >> > > > > >> you:
> > > > >> > > > > >>>> * get the segment interval
> > > > >> > > > > >>>> * get the window size
> > > > >> > > > > >>>> * get whether "duplicates" are enabled
> > > > >> > > > > >>>> * get the retention period
> > > > >> > > > > >>>> * (obviously) get a WindowStore<Bytes, byte[]>
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> We know that Materialized isn't exactly what we need
> > for
> > > > >> stream
> > > > >> > > joins,
> > > > >> > > > > >>>> but we can see how close Materialized is to what we
> > need.
> > > > If
> > > > >> it
> > > > >> > is
> > > > >> > > > > >>>> close, maybe we can use it and document the gaps, and
> > if
> > > it
> > > > >> is
> > > > >> > not
> > > > >> > > > > >>>> close, then maybe we should just add what we need to
> > > > Joined.
> > > > >> > > > > >>>> Stream Join's requirements for its stores:
> > > > >> > > > > >>>> * a multimap store (i.e., it keeps duplicates) for
> > > storing
> > > > >> > general
> > > > >> > > > > >>>> (not windowed) keyed records associated with their
> > > > insertion
> > > > >> > > time, and
> > > > >> > > > > >>>> allows efficient time-bounded lookups and also
> > efficient
> > > > >> purges
> > > > >> > > of old
> > > > >> > > > > >>>> data.
> > > > >> > > > > >>>> ** Note, a properly configured
> WindowBytesStoreSupplier
> > > > >> > satisfies
> > > > >> > > this
> > > > >> > > > > >>>> requirement, and the interface supports the queries
> we
> > > need
> > > > >> to
> > > > >> > > verify
> > > > >> > > > > >>>> the configuration at run-time
> > > > >> > > > > >>>> * set a name for the store
> > > > >> > > > > >>>> * do _not_ set the serdes (they are already set in
> > > Joined)
> > > > >> > > > > >>>> * logging could be configurable (set to enabled now)
> > > > >> > > > > >>>> * caching could be configurable (set to enabled now)
> > > > >> > > > > >>>> * do _not_ configure retention (determined by
> > > JoinWindows)
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> So, out of six capabilities for Materialized, there
> are
> > > two
> > > > >> we
> > > > >> > > don't
> > > > >> > > > > >>>> want (serdes and retention). These would become
> > run-time
> > > > >> checks
> > > > >> > > if we
> > > > >> > > > > >>>> use it.
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> A third questionable capability is to provide a
> > > > >> > > > > >>>> WindowBytesStoreSupplier. Looking at whether the
> > > > >> > > > > >>>> WindowBytesStoreSupplier is the right interface for
> > > Stream
> > > > >> Join:
> > > > >> > > > > >>>> * configuring segment interval is fine
> > > > >> > > > > >>>> * should _not_ configure window size (it's determined
> > by
> > > > >> > > JoinWindows)
> > > > >> > > > > >>>> * duplicates _must_ be enabled
> > > > >> > > > > >>>> * retention should be _at least_ windowSize +
> > > gracePeriod,
> > > > >> but
> > > > >> > > note
> > > > >> > > > > >>>> that (unlike for Table window stores) there is no
> > utility
> > > > in
> > > > >> > > having a
> > > > >> > > > > >>>> longer retention time.
> > > > >> > > > > >>>> * the WindowStore<Bytes, byte[]> itself is fine, if
> > > overly
> > > > >> > broad,
> > > > >> > > > > >>>> since the only two methods we need are
> `window.put(key,
> > > > >> value,
> > > > >> > > > > >>>> context().timestamp())` and `WindowStoreIterator<V2>
> > > iter =
> > > > >> > > > > >>>> window.fetch(key, timeFrom, timeTo)`.
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> Thus, flattening out the overlap for
> > > > WindowBytesStoreSupplier
> > > > >> > > onto the
> > > > >> > > > > >>>> overlap for Materialized, we have 9 capabilities
> total
> > > > (note
> > > > >> > > retention
> > > > >> > > > > >>>> is duplicated), we have 4 that we don't want:
> > > > >> > > > > >>>> * do _not_ set the serdes (they are already set in
> > > Joined)
> > > > >> > > > > >>>> * do _not_ configure retention (determined by
> > > JoinWindows)
> > > > >> > > > > >>>> * should _not_ configure window size (it's determined
> > by
> > > > >> > > JoinWindows)
> > > > >> > > > > >>>> * duplicates _must_ be enabled
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> These gaps would have to be covered with run-time
> > checks
> > > if
> > > > >> we
> > > > >> > > re-use
> > > > >> > > > > >>>> Materialized and WindowStoreBytesStoreSupplier both.
> > > Maybe
> > > > >> this
> > > > >> > > sounds
> > > > >> > > > > >>>> bad, but consider the other side, that we get 5 new
> > > > >> capabilities
> > > > >> > > we
> > > > >> > > > > >>>> don't require, but are still pretty nice:
> > > > >> > > > > >>>> * configure the bytes store
> > > > >> > > > > >>>> * set a name for the store
> > > > >> > > > > >>>> * configure caching
> > > > >> > > > > >>>> * configure logging
> > > > >> > > > > >>>> * configure segment interval
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> Not sure where this nets us out, but it's food for
> > > thought.
> > > > >> > > > > >>>> -John
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <
> > > > >> > wangg...@gmail.com
> > > > >> > > >
> > > > >> > > > > >> wrote:
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> Hi Bill,
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> I think by giving a Materialized param into
> > > stream-stream
> > > > >> join,
> > > > >> > > it's
> > > > >> > > > > >> okay
> > > > >> > > > > >>>>> (though still ideal) to say "we still would not
> expose
> > > the
> > > > >> > store
> > > > >> > > for
> > > > >> > > > > >>>>> queries", but it would sound a bit awkward to say
> "we
> > > > would
> > > > >> > also
> > > > >> > > > > >> ignore
> > > > >> > > > > >>>>> whatever the passed in store supplier but just use
> our
> > > > >> default
> > > > >> > > ones"
> > > > >> > > > > >> --
> > > > >> > > > > >>>>> again the concern is that, if in the future we'd
> want
> > to
> > > > >> change
> > > > >> > > the
> > > > >> > > > > >>>> default
> > > > >> > > > > >>>>> implementation of join algorithm which no longer
> rely
> > > on a
> > > > >> > window
> > > > >> > > > > >> store
> > > > >> > > > > >>>>> with deduping enabled, then we need to change this
> API
> > > > >> again by
> > > > >> > > > > >> changing
> > > > >> > > > > >>>>> the store supplier type.
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> If we do want to fill this hole for stream-stream
> > join,
> > > I
> > > > >> feel
> > > > >> > > just
> > > > >> > > > > >>>> adding
> > > > >> > > > > >>>>> a String typed store-name would even be less
> > > > >> future-intrusive
> > > > >> > if
> > > > >> > > we
> > > > >> > > > > >>>> expect
> > > > >> > > > > >>>>> this parameter to be modified later.
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> Does that makes sense?
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> Guozhang
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <
> > > > >> > bbej...@gmail.com>
> > > > >> > > > > >> wrote:
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>> Thanks for the comments John and Guozhang, I'll
> > address
> > > > >> each
> > > > >> > > one of
> > > > >> > > > > >>>> your
> > > > >> > > > > >>>>>> comments in turn.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> John,
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>> I'm wondering about a missing quadrant from the
> > truth
> > > > >> table
> > > > >> > > > > >> involving
> > > > >> > > > > >>>>>>> whether a Materialized is stored or not and
> querying
> > > is
> > > > >> > > > > >>>>>>> enabled/disabled... What should be the behavior if
> > > there
> > > > >> is
> > > > >> > no
> > > > >> > > > > >> store
> > > > >> > > > > >>>>>>> configured (e.g., if Materialized with only
> serdes)
> > > and
> > > > >> > > querying
> > > > >> > > > > >> is
> > > > >> > > > > >>>>>> enabled?
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>> It seems we have two choices:
> > > > >> > > > > >>>>>>> 1. we can force creation of a state store in this
> > > case,
> > > > so
> > > > >> > the
> > > > >> > > > > >> store
> > > > >> > > > > >>>>>>> can be used to serve the queries
> > > > >> > > > > >>>>>>> 2. we can provide just a queriable view, basically
> > > > >> letting IQ
> > > > >> > > > > >> query
> > > > >> > > > > >>>>>>> into the "KTableValueGetter", which would
> > > transparently
> > > > >> > > > > >> construct the
> > > > >> > > > > >>>>>>> query response by applying the operator logic to
> the
> > > > >> upstream
> > > > >> > > > > >> state
> > > > >> > > > > >>>> if
> > > > >> > > > > >>>>>>> the operator state isn't already stored.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> I agree with your assertion about a missing
> quadrant
> > > from
> > > > >> the
> > > > >> > > truth
> > > > >> > > > > >>>> table.
> > > > >> > > > > >>>>>> Additionally, I too like the concept of a queriable
> > > view.
> > > > >> > But I
> > > > >> > > > > >> think
> > > > >> > > > > >>>> that
> > > > >> > > > > >>>>>> goes a bit beyond the scope of this KIP and would
> > like
> > > to
> > > > >> > pursue
> > > > >> > > > > >> that
> > > > >> > > > > >>>>>> feature as follow-on work.  Also thinking about
> this
> > > KIP
> > > > >> some
> > > > >> > > > > >> more, I'm
> > > > >> > > > > >>>>>> thinking of the changes to Materialized might be a
> > > reach
> > > > as
> > > > >> > > well.
> > > > >> > > > > >>>>>> Separating the naming from a store and its
> queryable
> > > > state
> > > > >> > seems
> > > > >> > > > > >> like a
> > > > >> > > > > >>>>>> complex issue in and of itself and should be
> treated
> > > > >> > > accordingly.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> So here's what I'm thinking now.  We add
> Materialzied
> > > to
> > > > >> Join,
> > > > >> > > but
> > > > >> > > > > >> for
> > > > >> > > > > >>>> now,
> > > > >> > > > > >>>>>> we internally disable querying.  I know this breaks
> > our
> > > > >> > current
> > > > >> > > > > >>>> semantic
> > > > >> > > > > >>>>>> approach, but I think it's crucial that we do two
> > > things
> > > > in
> > > > >> > this
> > > > >> > > > > >> KIP
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>    1. Break the naming of the state stores from
> > Joined
> > > to
> > > > >> > > > > >>>> Materialized, so
> > > > >> > > > > >>>>>>    the naming of state stores follows our current
> > > pattern
> > > > >> and
> > > > >> > > > > >> enables
> > > > >> > > > > >>>>>> upgrades
> > > > >> > > > > >>>>>>    from 2.3 to 2.4
> > > > >> > > > > >>>>>>    2. Offer the ability to configure the state
> stores
> > > of
> > > > >> the
> > > > >> > > join,
> > > > >> > > > > >> even
> > > > >> > > > > >>>>>>    providing a different implementation (i.e.
> > > in-memory)
> > > > if
> > > > >> > > > > >> desired.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> With that in mind I'm considering changing the KIP
> to
> > > > >> remove
> > > > >> > the
> > > > >> > > > > >>>> changes to
> > > > >> > > > > >>>>>> Materialized, and we document very clearly that by
> > > > >> providing a
> > > > >> > > > > >>>> Materialized
> > > > >> > > > > >>>>>> object with a name is only for naming the state
> > store,
> > > > >> hence
> > > > >> > the
> > > > >> > > > > >>>> changelog
> > > > >> > > > > >>>>>> topics and any possible configurations of the
> store,
> > > but
> > > > >> this
> > > > >> > > store
> > > > >> > > > > >>>> *will
> > > > >> > > > > >>>>>> not be available for IQ.*
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> WDYT?
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> Guozhang,
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>> 1. About not breaking compatibility of
> stream-stream
> > > > join
> > > > >> > > > > >>>> materialized
> > > > >> > > > > >>>>>>> stores: I think this is a valid issue to tackle,
> but
> > > > after
> > > > >> > > > > >> thinking
> > > > >> > > > > >>>> about
> > > > >> > > > > >>>>>>> it once more I'm not sure if exposing Materialized
> > > would
> > > > >> be a
> > > > >> > > > > >> good
> > > > >> > > > > >>>>>> solution
> > > > >> > > > > >>>>>>> here. My rationles:
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of
> > > > >> window-store
> > > > >> > > is
> > > > >> > > > > >> not
> > > > >> > > > > >>>>>> ideal,
> > > > >> > > > > >>>>>>> and we want to modify it in the near future to be
> > more
> > > > >> > > > > >> efficient. Not
> > > > >> > > > > >>>>>>> allowing users to override such state store
> backend
> > > > gives
> > > > >> us
> > > > >> > > such
> > > > >> > > > > >>>> freedom
> > > > >> > > > > >>>>>>> (which was also considered in the original DSL
> > > design),
> > > > >> > whereas
> > > > >> > > > > >>>> getting a
> > > > >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out that
> > > > freedom
> > > > >> > out
> > > > >> > > > > >> of the
> > > > >> > > > > >>>>>>> window.
> > > > >> > > > > >>>>>>> 1.b For strema-stream join, in our original design
> > we
> > > > >> intend
> > > > >> > to
> > > > >> > > > > >>>> "never"
> > > > >> > > > > >>>>>>> want users to query the state, since it is just
> for
> > > > >> buffering
> > > > >> > > the
> > > > >> > > > > >>>>>> upcoming
> > > > >> > > > > >>>>>>> records from the stream. Now I know that some
> users
> > > may
> > > > >> > indeed
> > > > >> > > > > >> want
> > > > >> > > > > >>>> to
> > > > >> > > > > >>>>>>> query it from the debugging perspective, but
> still I
> > > > >> > concerned
> > > > >> > > > > >> about
> > > > >> > > > > >>>>>>> whether leveraging IQ for debugging purposes would
> > be
> > > > the
> > > > >> > right
> > > > >> > > > > >>>> solution
> > > > >> > > > > >>>>>>> here. And adding Materialized object opens the
> door
> > to
> > > > let
> > > > >> > > users
> > > > >> > > > > >>>> query
> > > > >> > > > > >>>>>>> about it (unless we did something intentionally to
> > > still
> > > > >> > > forbids
> > > > >> > > > > >> it),
> > > > >> > > > > >>>>>> which
> > > > >> > > > > >>>>>>> also restricts us in the future.
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> 2. About the coupling between Materialized.name()
> > and
> > > > >> > > queryable:
> > > > >> > > > > >>>> again I
> > > > >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if
> the
> > > > >> current
> > > > >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized
> is
> > > the
> > > > >> best
> > > > >> > > > > >>>> approach.
> > > > >> > > > > >>>>>>> Here I think I agree with John, that generally
> > > speaking
> > > > >> it's
> > > > >> > > > > >> better
> > > > >> > > > > >>>> be a
> > > > >> > > > > >>>>>>> control function on the `KTable` itself, rather
> than
> > > on
> > > > >> > > > > >>>> `Materialized`,
> > > > >> > > > > >>>>>> so
> > > > >> > > > > >>>>>>> fixing it via adding functions through
> > `Materialized`
> > > > >> seems
> > > > >> > > not a
> > > > >> > > > > >>>> natural
> > > > >> > > > > >>>>>> approach either.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> I understand your thoughts here, and up to a
> point, I
> > > > agree
> > > > >> > with
> > > > >> > > > > >> you.
> > > > >> > > > > >>>>>> But concerning not providing Materialized as it may
> > > > >> restrict
> > > > >> > us
> > > > >> > > in
> > > > >> > > > > >> the
> > > > >> > > > > >>>>>> future for delivering different implementations,
> I'm
> > > > >> wondering
> > > > >> > > if
> > > > >> > > > > >> we
> > > > >> > > > > >>>> are
> > > > >> > > > > >>>>>> doing some premature optimization here.
> > > > >> > > > > >>>>>> My rationale for saying so
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>    1. I think the cost of not allowing the naming
> of
> > > > state
> > > > >> > > stores
> > > > >> > > > > >> for
> > > > >> > > > > >>>> joins
> > > > >> > > > > >>>>>>    is too big of a gap to leave.   IMHO for joins
> to
> > > > follow
> > > > >> > the
> > > > >> > > > > >> current
> > > > >> > > > > >>>>>>    pattern of using Materialized for naming state
> > > stores
> > > > >> would
> > > > >> > > be
> > > > >> > > > > >> what
> > > > >> > > > > >>>> most
> > > > >> > > > > >>>>>>    users would expect to use.  As I said in my
> > comments
> > > > >> > above, I
> > > > >> > > > > >> think
> > > > >> > > > > >>>> we
> > > > >> > > > > >>>>>>    should *not include* the changes to Materialized
> > and
> > > > >> > enforce
> > > > >> > > > > >> named
> > > > >> > > > > >>>>>>    stores for joins as unavailable for IQ.
> > > > >> > > > > >>>>>>    2. We'll still have the join methods available
> > > > without a
> > > > >> > > > > >>>> Materialized
> > > > >> > > > > >>>>>>    allowing us to do something different internally
> > if
> > > a
> > > > >> > > > > >> Materialized
> > > > >> > > > > >>>> is
> > > > >> > > > > >>>>>> not
> > > > >> > > > > >>>>>>    provided.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use
> two
> > > > stones
> > > > >> > > rather
> > > > >> > > > > >>>> than
> > > > >> > > > > >>>>>> one
> > > > >> > > > > >>>>>>> to kill these two birds, and probably for this KIP
> > we
> > > > just
> > > > >> > > focus
> > > > >> > > > > >> on
> > > > >> > > > > >>>> 1)
> > > > >> > > > > >>>>>>> above. And for that I'd like to not expose the
> > > > >> Materialized
> > > > >> > > > > >> either
> > > > >> > > > > >>>> for
> > > > >> > > > > >>>>>>> rationales that I've listed above. Instead, we
> just
> > > > >> restrict
> > > > >> > > > > >> KIP-307
> > > > >> > > > > >>>> to
> > > > >> > > > > >>>>>> NOT
> > > > >> > > > > >>>>>>> use the Joined.name for state store names and
> always
> > > use
> > > > >> > > internal
> > > > >> > > > > >>>> names
> > > > >> > > > > >>>>>> as
> > > > >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of not
> > > being
> > > > >> able
> > > > >> > > to
> > > > >> > > > > >>>> cover
> > > > >> > > > > >>>>>> all
> > > > >> > > > > >>>>>>> internal names here, but now I feel this `hole`
> may
> > > > >> better be
> > > > >> > > > > >> filled
> > > > >> > > > > >>>> by,
> > > > >> > > > > >>>>>>> e.g. not creating changelog topics but just use
> the
> > > > >> upstream
> > > > >> > to
> > > > >> > > > > >>>>>>> re-bootstrap the materialized store, more
> > concretely:
> > > > when
> > > > >> > > > > >>>> materializing
> > > > >> > > > > >>>>>>> the store, try to piggy-back the changelog topic
> on
> > an
> > > > >> > existing
> > > > >> > > > > >>>> topic,
> > > > >> > > > > >>>>>> e.g.
> > > > >> > > > > >>>>>>> a) if the stream is coming directly from some
> source
> > > > topic
> > > > >> > > > > >> (including
> > > > >> > > > > >>>>>>> repartition topic), make that as changelog topic
> and
> > > if
> > > > >> it is
> > > > >> > > > > >>>> repartition
> > > > >> > > > > >>>>>>> topic change the retention / data purging policy
> > > > >> necessarily
> > > > >> > as
> > > > >> > > > > >>>> well; b)
> > > > >> > > > > >>>>>> if
> > > > >> > > > > >>>>>>> the stream is coming from some stateless
> operators,
> > > > >> delegate
> > > > >> > > that
> > > > >> > > > > >>>>>> stateless
> > > > >> > > > > >>>>>>> operator to the parent stream similar as a); if
> the
> > > > >> stream is
> > > > >> > > > > >> coming
> > > > >> > > > > >>>> from
> > > > >> > > > > >>>>>> a
> > > > >> > > > > >>>>>>> stream-stream join which is the only stateful
> > operator
> > > > >> that
> > > > >> > can
> > > > >> > > > > >>>> result in
> > > > >> > > > > >>>>>> a
> > > > >> > > > > >>>>>>> stream, consider merging the join into multi-way
> > joins
> > > > >> (yes,
> > > > >> > > > > >> this is
> > > > >> > > > > >>>> a
> > > > >> > > > > >>>>>> very
> > > > >> > > > > >>>>>>> hand-wavy thought, but the point here is that we
> do
> > > not
> > > > >> try
> > > > >> > to
> > > > >> > > > > >>>> tackle it
> > > > >> > > > > >>>>>>> now but leave it for a better solution :).
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> I really like this idea!  I agree with you in that
> > this
> > > > >> > approach
> > > > >> > > > > >> to too
> > > > >> > > > > >>>>>> much for adding in this KIP, but we could pick it
> up
> > > > later
> > > > >> and
> > > > >> > > > > >>>> leverage the
> > > > >> > > > > >>>>>> Optimization framework to accomplish this re-use.
> > > > >> > > > > >>>>>> Again, while I agree we should break the naming of
> > join
> > > > >> state
> > > > >> > > > > >> stores
> > > > >> > > > > >>>> from
> > > > >> > > > > >>>>>> KIP-307, IMHO it's something we should fix now as
> it
> > > will
> > > > >> be
> > > > >> > the
> > > > >> > > > > >> last
> > > > >> > > > > >>>> piece
> > > > >> > > > > >>>>>> we can provide to give users the ability to
> > completely
> > > > make
> > > > >> > > their
> > > > >> > > > > >>>>>> topologies "upgrade proof" when adding additional
> > > > >> operations.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> Thanks again to both of you for comments and I look
> > > > >> forward to
> > > > >> > > > > >> hearing
> > > > >> > > > > >>>> back
> > > > >> > > > > >>>>>> from you.
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> Regards,
> > > > >> > > > > >>>>>> Bill
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <
> > > > >> > > wangg...@gmail.com>
> > > > >> > > > > >>>> wrote:
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>>> Hello Bill,
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> Thanks for the KIP. Glad to see that we can likely
> > > > >> shooting
> > > > >> > two
> > > > >> > > > > >> birds
> > > > >> > > > > >>>>>> with
> > > > >> > > > > >>>>>>> one stone. I have some concerns though about those
> > > "two
> > > > >> > birds"
> > > > >> > > > > >>>>>> themselves:
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> 1. About not breaking compatibility of
> stream-stream
> > > > join
> > > > >> > > > > >>>> materialized
> > > > >> > > > > >>>>>>> stores: I think this is a valid issue to tackle,
> but
> > > > after
> > > > >> > > > > >> thinking
> > > > >> > > > > >>>> about
> > > > >> > > > > >>>>>>> it once more I'm not sure if exposing Materialized
> > > would
> > > > >> be a
> > > > >> > > > > >> good
> > > > >> > > > > >>>>>> solution
> > > > >> > > > > >>>>>>> here. My rationles:
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of
> > > > >> window-store
> > > > >> > > is
> > > > >> > > > > >> not
> > > > >> > > > > >>>>>> ideal,
> > > > >> > > > > >>>>>>> and we want to modify it in the near future to be
> > more
> > > > >> > > > > >> efficient. Not
> > > > >> > > > > >>>>>>> allowing users to override such state store
> backend
> > > > gives
> > > > >> us
> > > > >> > > such
> > > > >> > > > > >>>> freedom
> > > > >> > > > > >>>>>>> (which was also considered in the original DSL
> > > design),
> > > > >> > whereas
> > > > >> > > > > >>>> getting a
> > > > >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out that
> > > > freedom
> > > > >> > out
> > > > >> > > > > >> of the
> > > > >> > > > > >>>>>>> window.
> > > > >> > > > > >>>>>>> 1.b For strema-stream join, in our original design
> > we
> > > > >> intend
> > > > >> > to
> > > > >> > > > > >>>> "never"
> > > > >> > > > > >>>>>>> want users to query the state, since it is just
> for
> > > > >> buffering
> > > > >> > > the
> > > > >> > > > > >>>>>> upcoming
> > > > >> > > > > >>>>>>> records from the stream. Now I know that some
> users
> > > may
> > > > >> > indeed
> > > > >> > > > > >> want
> > > > >> > > > > >>>> to
> > > > >> > > > > >>>>>>> query it from the debugging perspective, but
> still I
> > > > >> > concerned
> > > > >> > > > > >> about
> > > > >> > > > > >>>>>>> whether leveraging IQ for debugging purposes would
> > be
> > > > the
> > > > >> > right
> > > > >> > > > > >>>> solution
> > > > >> > > > > >>>>>>> here. And adding Materialized object opens the
> door
> > to
> > > > let
> > > > >> > > users
> > > > >> > > > > >>>> query
> > > > >> > > > > >>>>>>> about it (unless we did something intentionally to
> > > still
> > > > >> > > forbids
> > > > >> > > > > >> it),
> > > > >> > > > > >>>>>> which
> > > > >> > > > > >>>>>>> also restricts us in the future.
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> 2. About the coupling between Materialized.name()
> > and
> > > > >> > > queryable:
> > > > >> > > > > >>>> again I
> > > > >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if
> the
> > > > >> current
> > > > >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at Materialized
> is
> > > the
> > > > >> best
> > > > >> > > > > >>>> approach.
> > > > >> > > > > >>>>>>> Here I think I agree with John, that generally
> > > speaking
> > > > >> it's
> > > > >> > > > > >> better
> > > > >> > > > > >>>> be a
> > > > >> > > > > >>>>>>> control function on the `KTable` itself, rather
> than
> > > on
> > > > >> > > > > >>>> `Materialized`,
> > > > >> > > > > >>>>>> so
> > > > >> > > > > >>>>>>> fixing it via adding functions through
> > `Materialized`
> > > > >> seems
> > > > >> > > not a
> > > > >> > > > > >>>> natural
> > > > >> > > > > >>>>>>> approach either.
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use
> two
> > > > stones
> > > > >> > > rather
> > > > >> > > > > >>>> than
> > > > >> > > > > >>>>>> one
> > > > >> > > > > >>>>>>> to kill these two birds, and probably for this KIP
> > we
> > > > just
> > > > >> > > focus
> > > > >> > > > > >> on
> > > > >> > > > > >>>> 1)
> > > > >> > > > > >>>>>>> above. And for that I'd like to not expose the
> > > > >> Materialized
> > > > >> > > > > >> either
> > > > >> > > > > >>>> for
> > > > >> > > > > >>>>>>> rationales that I've listed above. Instead, we
> just
> > > > >> restrict
> > > > >> > > > > >> KIP-307
> > > > >> > > > > >>>> to
> > > > >> > > > > >>>>>> NOT
> > > > >> > > > > >>>>>>> use the Joined.name for state store names and
> always
> > > use
> > > > >> > > internal
> > > > >> > > > > >>>> names
> > > > >> > > > > >>>>>> as
> > > > >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of not
> > > being
> > > > >> able
> > > > >> > > to
> > > > >> > > > > >>>> cover
> > > > >> > > > > >>>>>> all
> > > > >> > > > > >>>>>>> internal names here, but now I feel this `hole`
> may
> > > > >> better be
> > > > >> > > > > >> filled
> > > > >> > > > > >>>> by,
> > > > >> > > > > >>>>>>> e.g. not creating changelog topics but just use
> the
> > > > >> upstream
> > > > >> > to
> > > > >> > > > > >>>>>>> re-bootstrap the materialized store, more
> > concretely:
> > > > when
> > > > >> > > > > >>>> materializing
> > > > >> > > > > >>>>>>> the store, try to piggy-back the changelog topic
> on
> > an
> > > > >> > existing
> > > > >> > > > > >>>> topic,
> > > > >> > > > > >>>>>> e.g.
> > > > >> > > > > >>>>>>> a) if the stream is coming directly from some
> source
> > > > topic
> > > > >> > > > > >> (including
> > > > >> > > > > >>>>>>> repartition topic), make that as changelog topic
> and
> > > if
> > > > >> it is
> > > > >> > > > > >>>> repartition
> > > > >> > > > > >>>>>>> topic change the retention / data purging policy
> > > > >> necessarily
> > > > >> > as
> > > > >> > > > > >>>> well; b)
> > > > >> > > > > >>>>>> if
> > > > >> > > > > >>>>>>> the stream is coming from some stateless
> operators,
> > > > >> delegate
> > > > >> > > that
> > > > >> > > > > >>>>>> stateless
> > > > >> > > > > >>>>>>> operator to the parent stream similar as a); if
> the
> > > > >> stream is
> > > > >> > > > > >> coming
> > > > >> > > > > >>>>>> from a
> > > > >> > > > > >>>>>>> stream-stream join which is the only stateful
> > operator
> > > > >> that
> > > > >> > can
> > > > >> > > > > >>>> result
> > > > >> > > > > >>>>>> in a
> > > > >> > > > > >>>>>>> stream, consider merging the join into multi-way
> > joins
> > > > >> (yes,
> > > > >> > > > > >> this is
> > > > >> > > > > >>>> a
> > > > >> > > > > >>>>>> very
> > > > >> > > > > >>>>>>> hand-wavy thought, but the point here is that we
> do
> > > not
> > > > >> try
> > > > >> > to
> > > > >> > > > > >>>> tackle it
> > > > >> > > > > >>>>>>> now but leave it for a better solution :).
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> Guozhang
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler <
> > > > >> > > j...@confluent.io
> > > > >> > > > > >>>
> > > > >> > > > > >>>> wrote:
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>>> Hi Bill,
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> Thanks for the KIP! Awesome job catching this
> > > > unexpected
> > > > >> > > > > >>>> consequence
> > > > >> > > > > >>>>>>>> of the prior KIPs before it was released.
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> The proposal looks good to me. On top of just
> > fixing
> > > > the
> > > > >> > > > > >> problem,
> > > > >> > > > > >>>> it
> > > > >> > > > > >>>>>>>> seems to address two other pain points:
> > > > >> > > > > >>>>>>>> * that naming a state store automatically causes
> it
> > > to
> > > > >> > become
> > > > >> > > > > >>>>>> queriable.
> > > > >> > > > > >>>>>>>> * that there's currently no way to configure the
> > > bytes
> > > > >> store
> > > > >> > > > > >> for
> > > > >> > > > > >>>> join
> > > > >> > > > > >>>>>>>> windows.
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> It's awesome that we can fix this issue and two
> > > others
> > > > >> with
> > > > >> > > one
> > > > >> > > > > >>>>>> feature.
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> I'm wondering about a missing quadrant from the
> > truth
> > > > >> table
> > > > >> > > > > >>>> involving
> > > > >> > > > > >>>>>>>> whether a Materialized is stored or not and
> > querying
> > > is
> > > > >> > > > > >>>>>>>> enabled/disabled... What should be the behavior
> if
> > > > there
> > > > >> is
> > > > >> > no
> > > > >> > > > > >>>> store
> > > > >> > > > > >>>>>>>> configured (e.g., if Materialized with only
> serdes)
> > > and
> > > > >> > > > > >> querying is
> > > > >> > > > > >>>>>>>> enabled?
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> It seems we have two choices:
> > > > >> > > > > >>>>>>>> 1. we can force creation of a state store in this
> > > case,
> > > > >> so
> > > > >> > the
> > > > >> > > > > >>>> store
> > > > >> > > > > >>>>>>>> can be used to serve the queries
> > > > >> > > > > >>>>>>>> 2. we can provide just a queriable view,
> basically
> > > > >> letting
> > > > >> > IQ
> > > > >> > > > > >> query
> > > > >> > > > > >>>>>>>> into the "KTableValueGetter", which would
> > > transparently
> > > > >> > > > > >> construct
> > > > >> > > > > >>>> the
> > > > >> > > > > >>>>>>>> query response by applying the operator logic to
> > the
> > > > >> > upstream
> > > > >> > > > > >>>> state if
> > > > >> > > > > >>>>>>>> the operator state isn't already stored.
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> Offhand, it seems like the second is actually a
> > > pretty
> > > > >> > awesome
> > > > >> > > > > >>>>>>>> capability. But it might have an awkward
> > interaction
> > > > with
> > > > >> > the
> > > > >> > > > > >>>> current
> > > > >> > > > > >>>>>>>> semantics. Presently, if I provide a
> > > > >> Materialized.withName,
> > > > >> > it
> > > > >> > > > > >>>> implies
> > > > >> > > > > >>>>>>>> that querying should be enabled AND that the view
> > > > should
> > > > >> > > > > >> actually
> > > > >> > > > > >>>> be
> > > > >> > > > > >>>>>>>> stored in a state store. Under option 2 above,
> this
> > > > >> behavior
> > > > >> > > > > >> would
> > > > >> > > > > >>>>>>>> change to NOT provision a state store and instead
> > > just
> > > > >> > consult
> > > > >> > > > > >> the
> > > > >> > > > > >>>>>>>> ValueGetter. To get back to the current behavior,
> > > users
> > > > >> > would
> > > > >> > > > > >> have
> > > > >> > > > > >>>> to
> > > > >> > > > > >>>>>>>> add a "bytes store supplier" to the Materialized
> to
> > > > >> indicate
> > > > >> > > > > >> that,
> > > > >> > > > > >>>>>>>> yes, they really want a state store there.
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> Behavior changes are always kind of scary, but I
> > > think
> > > > in
> > > > >> > this
> > > > >> > > > > >>>> case,
> > > > >> > > > > >>>>>>>> it might actually be preferable. In the event
> where
> > > > only
> > > > >> the
> > > > >> > > > > >> name
> > > > >> > > > > >>>> is
> > > > >> > > > > >>>>>>>> provided, it means that people just wanted to
> make
> > > the
> > > > >> > > > > >> operation
> > > > >> > > > > >>>>>>>> result queriable. If we automatically convert
> this
> > > to a
> > > > >> > > > > >> non-stored
> > > > >> > > > > >>>>>>>> view, then simply upgrading results in the same
> > > > >> observable
> > > > >> > > > > >> behavior
> > > > >> > > > > >>>>>>>> and semantics, but a linear reduction in local
> > > storage
> > > > >> > > > > >> requirements
> > > > >> > > > > >>>>>>>> and disk i/o, as well as a corresponding linear
> > > > >> reduction in
> > > > >> > > > > >> memory
> > > > >> > > > > >>>>>>>> usage both on and off heap.
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> What do you think?
> > > > >> > > > > >>>>>>>> -John
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>> On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <
> > > > >> > > bbej...@gmail.com
> > > > >> > > > > >>>
> > > > >> > > > > >>>> wrote:
> > > > >> > > > > >>>>>>>>>
> > > > >> > > > > >>>>>>>>> All,
> > > > >> > > > > >>>>>>>>>
> > > > >> > > > > >>>>>>>>> I'd like to start a discussion for adding a
> > > > Materialized
> > > > >> > > > > >>>>>> configuration
> > > > >> > > > > >>>>>>>>> object to KStream.join for naming state stores
> > > > involved
> > > > >> in
> > > > >> > > > > >> joins.
> > > > >> > > > > >>>>>>>>>
> > > > >> > > > > >>>>>>>>>
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>
> > > > >> > > > > >>
> > > > >> > > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
> > > > >> > > > > >>>>>>>>>
> > > > >> > > > > >>>>>>>>> Your comments and suggestions are welcome.
> > > > >> > > > > >>>>>>>>>
> > > > >> > > > > >>>>>>>>> Thanks,
> > > > >> > > > > >>>>>>>>> Bill
> > > > >> > > > > >>>>>>>>
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>> --
> > > > >> > > > > >>>>>>> -- Guozhang
> > > > >> > > > > >>>>>>>
> > > > >> > > > > >>>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> --
> > > > >> > > > > >>>>> -- Guozhang
> > > > >> > > > > >>>>
> > > > >> > > > > >>>
> > > > >> > > > > >>>
> > > > >> > > > > >>> --
> > > > >> > > > > >>> -- Guozhang
> > > > >> > > > > >>
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to