Hi,

@Paul: Thank you for the KIP!

I hope you do not mind that I jump in.

I have the following comments:

1) `null` vs empty list in the default implementation
IIUC, returning `null` in the default implementation should basically
signal that the method `stateStores` was not overridden. Why then provide a
default implementation in the first place? Without default implementation
you would discover the missing implementation already at compile-time and
not only at runtime. If you decide not to provide a default implementation,
`XSupplier extends StateStoreConnector` would break existing code as
Matthias has already pointed out.

2) `process` method adding the StoreBuilders to the topology
If the default implementation returned `null` and `XSupplier extends
StateStoreConnector`, then existing code would break, because
`StreamsBuilder#addStateStore()` would throw a NPE.

+1 for opening a WIP PR

Best,
Bruno


On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thank Paul!
>
> I agree with all of that. If we think that the general design is good,
> refactoring a PR if we want to pick a different name should not be too
> much additional work (hopefully). Thus, if you want to open a WIP PR and
> we use it to nail the open details, it might help to find a good
> conclusion.
>
> >> 2) Default method vs new interface:
>
> This seems to be the hardest tradeoff. I see the point about
> discoveability... Might be good to get input from others, which version
> they would prefer.
>
> Just to make clear, my suggestion from the last email was, that
> `Transformer` etc does not extend the new interface. Instead, a user
> that want to use this feature would need to implement both interfaces.
>
> If `Transformer extends StoreProvider` (just picking a name here)
> without default implementation existing code would break and thus it not
> a an option because of breaking backward compatibility.
>
>
> -Matthias
>
> On 4/28/19 8:37 PM, Paul Whalen wrote:
> > Great thoughts Matthias, thanks! I think we're all agreed that naming and
> > documentation/education are the biggest hurdles for this KIP, and in
> light
> > of that, I think it makes sense for me to just take a stab at a full
> > fledged PR with documentation to convince us that it's possible to do it
> > with enough clarity.
> >
> > In response to your specific thoughts:
> >
> > 1) StateStoreConnector as a name: Really good point about defining the
> > difference between "adding" and "connecting."  Guozhang suggested
> > StateStoreConnector which was definitely an improvement over my
> > StateStoresSupplier, but I think you're right that we need to be careful
> to
> > make it clear that it's really accomplishing both.  Thinking about it
> now,
> > one problem with Connector is that the implementer of the interface is
> not
> > really doing any connecting, it's providing/supplying the store that will
> > be both added and connected.  StoreProvider seems reasonable to me and
> > probably the best candidate at the moment, but it would be nice if the
> name
> > could convey that it's providing the store specifically so the caller can
> > add it to the topology and connect it to the associated transformer.
> >
> > In general I think that really calling out what "adding" versus
> > "connecting" is in the documentation will help make the entire purpose of
> > this feature more clear to the user.
> >
> > 2) Default method vs new interface: The choice of a default method was
> > influenced by Guozhang's fear about API bloat/discoverability.  I can
> > definitely see it both ways   Would the separate interface be a
> > sub-interface of Processor/TransformerSupplier or standalone?  It seems
> > like you're suggesting standalone and I think that's what I favor.  My
> only
> > concern there is that the interface wouldn't actually be a type to any
> > public API which sort of hurts discoverability.  You would have to read
> the
> > javadocs for stream.process/transform() to discover that implementing the
> > interface in addition to Processor/TransformerSupplier would add and
> > connect the store for you.  But that added burden actually probably helps
> > us in terms of making sure people don't mix and match, like you said.
> >
> > 3) Returning null instead of empty: Seems fair to me.  I always worry
> about
> > returning null when an empty collection can be used instead, but given
> that
> > the library is the caller rather than the client I think your point makes
> > sense here.
> >
> > 4) Returning Set instead of Collection: Agreed, don't see why not to make
> > it more specific.
> >
> > Paul
> >
> > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Hi, sorry for the long pause. Just trying to catch up here.
> >>
> >> I think it save to allow `addStateStore()` to be idempotent for the same
> >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's
> >> not really possible to use the same `StoreBuilder` object to create
> >> different stores.
> >>
> >> I also agree with the concern, that only allowing a single store (as
> >> proposed by Ivan) might be too restrictive.
> >>
> >> Overall, the current KIP version LGTM. I don't have mayor concerns about
> >> user education for this case, but I agree that we need to document this
> >> clearly.
> >>
> >> Some further comments:
> >>
> >> (1) I am not sure if `StateStoreConnector` is the best name for the new
> >> interface. Note, that there are two concepts about stores:
> >>
> >>  - adding a store: this makes the store available in the topology in
> >> general (however, the store is still "dangling", and not used)
> >>  - connecting a store: this allows a processor etc to use a store
> >>
> >> The new interface does both, but its name only indicates that second
> >> part what might be confusing. It might be especially confusing because
> >> we want to disallow to mix the exiting "manually add and connect"
> >> pattern, with a new pattern to "auto add+connect". If the new interface
> >> name indicates the connect part only, user might think they need to add
> >> stores manually and can connect automatically.
> >>
> >> Unfortunately, I don't have a much better suggestion for a name either.
> >> The only idea that came to my mind was `StoreProvider`: to me, a
> >> provider is a "service" interface that does work for us, ie, it adds and
> >> connects a store. Not sure if this is too subtle, if we consider that
> >> there is already the `StoreSupplier` interface?
> >>
> >> But maybe somebody else might still have a good idea on how the improve
> >> the name.
> >>
> >> In any case, I would suggest to shorten the name to `StoreConnector`
> >> instead of `StateStoreConnector`, because we also have `StoreSupplier`
> >> and `StoreBuilder`.
> >>
> >>
> >>
> >> (2) The KIP proposes to add the new interface to `ProcessorSupplier` etc
> >> and to add a default implementation for the new method. Hence, user
> >> would need to overwrite this default implementation to op-in to the
> >> feature. I am wonder if it might be better to not add the new interface
> >> to `ProcessorSupplier` etc and to just provide a new interface with no
> >> default implementation. Users would opt-in by adding the interface
> >> explicitly to their existing `ProcessorSupplier` implementation.
> >> Overwriting a default method and getting different behavior seems to be
> >> a little subtle to me, especially, because we don't want to allow to
> >> mix-and-match the old and new approaches. Think: I only overwrite a
> >> default method and my code breaks.
> >>
> >> Thoughts?
> >>
> >>
> >>
> >> (3) If we keep the current default implementation for the new method, I
> >> am wondering if it should return `null` instead of an empty collection?
> >> This might be saver to detect bugs in user code for which, per accident,
> >> an empty collection could be returned.
> >>
> >>
> >>
> >> (4) Should the new method return a `Set` instead of a `Collection` to
> >> indicate the semantics clearly (ie, returning the same `StoreBuilder`
> >> multiple times is idempotent and one cannot add+connect to it twice).
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >>
> >> On 4/6/19 12:27 PM, Paul Whalen wrote:
> >>> Ivan and Guozhang,
> >>>
> >>> Thanks for the thoughts!  Ivan's use case is definitely interesting.
> The
> >>> way I see it, if we can achieve the main goal of the KIP (allowing
> >>> Processor/TransformerSuppliers to encapsulate their usage of state
> >> stores),
> >>> we will enable this kind of thing in "user space" very easily.
> >>>
> >>> I will say that I'm not totally sure that most use cases of transform()
> >> use
> >>> just one state store.  It's hard to know since I haven't seen many
> >> examples
> >>> in public, but my team's usages almost exclusively require multiple
> state
> >>> stores.  We only reach for the low level processor API when we need
> that
> >>> complexity, and it's somewhat hard to imagine many use cases that only
> >> need
> >>> one state store, since the high level DSL can usually accomplish those
> >>> tasks.  The example Ivan presented for instance looks like a
> >>> stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what sort
> of
> >>> other usages you're imagining.
> >>>
> >>> That being said, perhaps the Processor API should really just be
> >> considered
> >>> a separate paradigm in Streams, not just a lower level that we reach to
> >>> when necessary.  In which case it would be beneficial to make the
> simple
> >>> use cases easier.  I've definitely talked about this with my own team -
> >> if
> >>> you're less familiar with the kind of functional style that the high
> >> level
> >>> DSL offers, it might be easier to "see" your state and interact with it
> >>> directly.
> >>>
> >>> Anyway, I've updated the KIP to reflect my current PR with Guozhang's
> >>> suggestions.  It seems like there is at least some interest in that on
> >> its
> >>> own and not a ton of pushback, so I think I will try to start a vote.
> >>>
> >>> Paul
> >>>
> >>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <iponoma...@mail.ru>
> >> wrote:
> >>>
> >>>> Hi all!
> >>>>
> >>>> I was about to write another KIP, but found out that KIP-401 addresses
> >>>> exactly the problem I faced. So let me jump into your discussion and
> ask
> >>>> you to assess another idea.
> >>>>
> >>>> I fully agree with the KIP-401's motivation part. E. g in my project I
> >> had
> >>>> to invent a wrapper class that hides the details of KeyValueStore
> >>>> management from business logic. Of course this should be done better
> in
> >>>> KStreams API.
> >>>>
> >>>> But I was about to look at this problem from another side and propose
> a
> >>>> simple alternative in high-level DSL, that will not fit all the cases,
> >> but
> >>>> most of them. Hence my idea does not exclude the Paul's proposal.
> >>>>
> >>>> What if we restrict ourselves to *only one* KeyValueStore and propose
> a
> >>>> method that resembles  `aggregate` and `reduce` methods, like this:
> >>>>
> >>>> stream
> >>>>    .map(...)
> >>>>    .filter(...)
> >>>>    .transform ((k, v, s)->{....}, Transformed.with(....))
> >>>>
> >>>> where
> >>>> * k, v -- input key & value
> >>>> * s -- a KeyValueStore provided as an argument
> >>>> * return value of the lambda should be KeyValue.pair(...)
> >>>> * Transformed.with... is a builder, used in order to define the
> >>>> Transformer and KeyValueStore building parameters. Some of these
> >> parameters
> >>>> should be:
> >>>> ** store's KeySerde,
> >>>> ** store's ValueSerde,
> >>>> ** whether the store is persistent or in-memory,
> >>>> ** store's name -- optional parameter, the system should be able to
> >> devise
> >>>> the name of the store transparently for the user, if we don't want to
> >>>> devise it ourselves/share the store between processors.
> >>>> ** scheduled punctuation.
> >>>>
> >>>> Imagine we have a KStream<String, Integer>, and we need to calculate a
> >>>> `derivative` stream, that is, a stream of 'deltas' of the provided
> >> integer
> >>>> values.
> >>>>
> >>>> This could be achieved as simple as
> >>>>
> >>>> stream.transform((key, value, stateStore) -> {
> >>>>         int previousValue =
> >>>> Optional.ofNullable(stateStore.get(key)).orElse(0);
> >>>>         stateStore.put(key, value);
> >>>>         return KeyValue.pair(key, value - previousValue);
> >>>>         }
> >>>>         //we do not need to bother with store name, punctuation etc.
> >>>>         //may be even Serde part can be omitted, since we can inherit
> >> the
> >>>> serdes from stream by default
> >>>>         , Transformed.with(Serdes.String(), Serdes.Integer())
> >>>> }
> >>>>
> >>>> The hard part of it is that new `transform` method definition should
> be
> >>>> parameterized by six type parameters:
> >>>>
> >>>> * input/output/KeyValueStore key type,
> >>>> * input/output/KeyValueStore value type.
> >>>>
> >>>> However, it seems that all these types can be inferred from the
> provided
> >>>> lambda and Transformed.with instances.
> >>>>
> >>>> What do you think about this?
> >>>>
> >>>> Regards,
> >>>>
> >>>> Ivan
> >>>>
> >>>>
> >>>> 27.03.2019 20:45, Guozhang Wang пишет:
> >>>>
> >>>> Hello Paul,
> >>>>
> >>>> Thanks for the uploaded PR and the detailed description! I've made a
> >> pass
> >>>> on it and left some comments.
> >>>>
> >>>> Overall I think I agree with you that passing in the storebuilder
> >> directly
> >>>> that store name is more convienent as it does not require another
> >>>> `addStore` call, but we just need to spend some more documentation
> >> effort
> >>>> on educating users about the two ways of connecting their stores. I'm
> >>>> slightly concerned about this education curve but I can be convinced
> if
> >>>> most people felt it is worthy.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> <
> >> pgwha...@gmail.com> wrote:
> >>>>
> >>>>
> >>>> I'd like to resurrect this discussion with a cursory, proof-of-concept
> >>>> implementation of the KIP which combines many of our ideas:
> >> https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
> >>>> small as possible for now, just using it to convey the main ideas.
> But
> >>>> I'll separately address some of our earlier discussion:
> >>>>
> >>>>    - Will there be a new, separate interface for users to implement
> for
> >> the
> >>>>    new functionality? No, to hopefully keep things simple, all of the
> >>>>    Processor/TransformerSupplier interfaces will just extend
> >>>>    StateStoresSupplier, allowing users to opt in to this functionality
> >> by
> >>>>    overriding the default implementation that gives an empty list.
> >>>>    - Will the interface allow users to specify the store name, or the
> >>>>    entire StoreBuilder? The entire StoreBuilder, so the
> >>>>    Processor/TransformerSupplier can completely encapsulate name and
> >>>>    implementation of a state store if desired.
> >>>>    - Will the old way of specifying store names alongside the supplier
> >> when
> >>>>    calling stream.process/transform() be deprecated? No, this is
> still a
> >>>>    legitimate way to wire up Processors/Transformers and their stores.
> >> But
> >>>> I
> >>>>    would recommend not allowing stream.process/transform() calls that
> >> use
> >>>> both
> >>>>    store declaration mechanisms (this restriction is not in the proof
> of
> >>>>    concept)
> >>>>    - How will we handle adding the same state store to the topology
> >>>>    multiple times because different Processor/TransformerSuppliers
> >> declare
> >>>> it?
> >>>>    topology.addStateStore() will be slightly relaxed for convenience,
> >> and
> >>>> will
> >>>>    allow adding the same StoreBuilder multiple times as long as the
> >> exact
> >>>> same
> >>>>    StoreBuilder instance is being added for the same store name.  This
> >>>> seems
> >>>>    to prevent in practice the issue of accidentally making two state
> >> stores
> >>>>    one by adding with the same name.  For additional safety, if we
> >> wanted
> >>>> to
> >>>>    (not in the proof of concept), we could allow for this relaxation
> >> only
> >>>> for
> >>>>    internal callers of topology.addStateStore().
> >>>>
> >>>> So, in summary, the use cases look like:
> >>>>
> >>>>    - 1 transformer/processor that owns its store: Using the new
> >>>>    StateStoresSupplier interface method to supply its StoreBuilders
> that
> >>>> will
> >>>>    be added to the topology automatically.
> >>>>    - Multiple transformer/processors that share the same store: Either
> >>>>
> >>>>
> >>>>    1. The old way: the StoreBuilder is defined "far away" from the
> >>>>    Transformer/Processor implementations, and is added to the topology
> >>>>    manually by the user
> >>>>    2. The new way: the StoreBuilder is defined closer to the
> >>>>    Transformer/Processor implementations, and the same instance is
> >>>> returned by
> >>>>    all Transformer/ProcessorSuppliers that need it
> >>>>
> >>>>
> >>>> This makes the KIP wiki a bit stale; I'll update if we want to bring
> >> this
> >>>> design to a vote.
> >>>>
> >>>> Thanks!
> >>>> Paul
> >>>>
> >>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> <
> >> wangg...@gmail.com> wrote:
> >>>>
> >>>>
> >>>> Matthias / Paul,
> >>>>
> >>>> The concern I had about introducing `StoreBuilderSupplier` is simply
> >>>> because it is another XXSupplier to the public API, so I'd like to ask
> >> if
> >>>> we really have to add it :)
> >>>>
> >>>> The difference between encapsulating the store name and encapsulating
> >> the
> >>>> full state store builder is that, in the former:
> >>>>
> >>>> -----------
> >>>>
> >>>> String storeName = "store1";
> >>>> builder.addStore(new MyStoreBuilder(storeName));
> >>>> stream1.transform(new MyTransformerSupplier(storeName));   //
> following
> >>>>
> >>>> my
> >>>>
> >>>> proposal, that the store name can be passed in and used for both
> >>>> `listStores` and in the `Transformer#init`; so the Transformer
> function
> >>>> does not need to get the constant string name again.
> >>>>
> >>>>                          // one caveat to admit, is that
> >>>> MyTransofmerSupplier logic may be just unique to `store1` so it cannot
> >> be
> >>>> reused with a different store name anyways.
> >>>> -----------
> >>>>
> >>>> While in the latter:
> >>>>
> >>>> -----------
> >>>>
> >>>> stream1.transform(new MyTransformerSupplierForStore1);   // the name
> is
> >>>> just indicating that we may have one such supplier for each store.
> >>>>
> >>>> -----------
> >>>>
> >>>> I understand the latter introduce more convenience from the API, but
> the
> >>>> cost is that since we still cannot completely `builder.addStore`, but
> >>>>
> >>>> only
> >>>>
> >>>> reduce its semantic scope to shared state stores only,; hence users
> need
> >>>>
> >>>> to
> >>>>
> >>>> learn two ways of creating state stores for those two patterns.
> >>>>
> >>>> My argument is that more public APIs requires longer learning curve
> for
> >>>> users, and introduces more usage patterns that may confuse users (the
> >>>> proposal I had tries to replace one with another completely).
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> <
> >> pgwha...@gmail.com> wrote:
> >>>>
> >>>>
> >>>> Thanks for the great thoughts Matthias and Guozhang!
> >>>>
> >>>> If I'm not mistaken, Guozhang's suggestion is what my second
> >>>>
> >>>> alternative
> >>>>
> >>>> on
> >>>>
> >>>> the KIP is ("Have the added method on the Supplier interfaces only
> >>>>
> >>>> return
> >>>>
> >>>> store names, not builders").  I do think it would be a worthwhile
> >>>>
> >>>> usability
> >>>>
> >>>> improvement on its own, but to Matthias's point, it doesn't achieve
> the
> >>>> full goal of completing encapsulating a state store and it's processor
> >>>>
> >>>> -
> >>>>
> >>>> it
> >>>>
> >>>> encapsulates the name, but not the StateStoreBuilder.
> >>>>
> >>>> I'm really intrigued by Matthias's idea that forgoes the default
> >>>>
> >>>> interface
> >>>>
> >>>> method I proposed.  Having smaller, separate interfaces is a powerful
> >>>>
> >>>> idea
> >>>>
> >>>> and I think a cleaner API than what I proposed.  The non-shared store
> >>>>
> >>>> use
> >>>>
> >>>> case is handled well here, and the shared store use case is possible,
> >>>> though maybe still not as graceful as we would like (having to add the
> >>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too
> >>>>
> >>>> subtle
> >>>>
> >>>> to
> >>>>
> >>>> me).
> >>>>
> >>>> We're all agreed that one of the big problems with the shared store
> use
> >>>> case is how to deal with adding the same store to the topology
> multiple
> >>>> times.  Catching the "store already added" exception is risky.  Here's
> >>>>
> >>>> a
> >>>>
> >>>> maybe radical idea: change `topology.addStateStore()` to be idempotent
> >>>>
> >>>> for
> >>>>
> >>>> adding a given state store name and `StoreBuilder`.  In other words,
> >>>> `addStateStore` would not throw the "store already added" exception if
> >>>>
> >>>> the
> >>>>
> >>>> `StoreBuilder` being added for a given name has the same identity as
> >>>>
> >>>> the
> >>>>
> >>>> one that has already been added.  Does this eliminate all the bugs
> >>>>
> >>>> we're
> >>>>
> >>>> worried about?  Thinking about it for a few minutes, it seems to
> >>>>
> >>>> eliminate
> >>>>
> >>>> most at least (would a user really use the exact same StoreBuilder
> when
> >>>> they intend there to be two stores?).  It might make the API slightly
> >>>> harder to use if a user isn't immediately aware of that subtlety, but
> a
> >>>> good error message should ease the pain, and it would happen
> >>>>
> >>>> immediately
> >>>>
> >>>> during development.
> >>>>
> >>>> And with regards to Matthias's comment about whether we need to
> >>>>
> >>>> deprecate
> >>>>
> >>>> existing varargs transform methods - I don't think we need to, but it
> >>>>
> >>>> might
> >>>>
> >>>> be nice for there only to be one way to do things, assuming whatever
> we
> >>>> come up with supports all existing use cases.  I don't feel strongly
> >>>>
> >>>> about
> >>>>
> >>>> this, but if we don't deprecate, I do think it's important to add
> >>>>
> >>>> checks
> >>>>
> >>>> that prevent users from trying to do the same thing in two different
> >>>>
> >>>> ways,
> >>>>
> >>>> as we've discussed.
> >>>>
> >>>> Paul
> >>>>
> >>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <
> matth...@confluent.io
> >>>>
> >>>> wrote:
> >>>>
> >>>>
> >>>> Guozhang,
> >>>>
> >>>>
> >>>> Regarding the last option to catch "store exist already" exception
> >>>>
> >>>> and
> >>>>
> >>>> fallback to connect stores, I'm a bit concerned it may be hiding
> >>>>
> >>>> actual
> >>>>
> >>>> user bugs.
> >>>>
> >>>> I agree with this concern. From my original email:
> >>>>
> >>>>
> >>>> The only disadvantage I see, might be
> >>>> potential bugs about sharing state if two different stores are
> >>>>
> >>>> named
> >>>>
> >>>> the
> >>>>
> >>>> same by mistake (this would not be detected).
> >>>>
> >>>> For your new proposal: I am not sure if it addresses Paul's original
> >>>> idea -- I hope Paul can clarify. From my understanding, the idea was
> >>>>
> >>>> to
> >>>>
> >>>> encapsulate a store and its processor. As many stores are not shared,
> >>>> this seems to be quite useful. Your proposal falls a little short to
> >>>> support encapsulation for none-shared stores.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 12/15/18 1:40 AM, Guozhang Wang wrote:
> >>>>
> >>>> Matthias,
> >>>>
> >>>> Thanks for your feedbacks.
> >>>>
> >>>> Regarding the last option to catch "store exist already" exception
> >>>>
> >>>> and
> >>>>
> >>>> fallback to connect stores, I'm a bit concerned it may be hiding
> >>>>
> >>>> actual
> >>>>
> >>>> user bugs.
> >>>>
> >>>> Thinking about Paul's proposal and your suggestion again, I'd like
> >>>>
> >>>> to
> >>>>
> >>>> propose another alternative somewhere in the middle of your
> >>>>
> >>>> approaches,
> >>>>
> >>>> i.e. we still let users to create sharable state stores via
> >>>> `addStateStore`, and we allow the TransformerSupplier to return a
> >>>>
> >>>> list
> >>>>
> >>>> of
> >>>>
> >>>> state stores that it needs, i.e.:
> >>>>
> >>>> public interface TransformerSupplier<K, V, R> {
> >>>>     Transformer<K, V, R> get();
> >>>>     default List<String> stateStoreNames() {
> >>>>         return Collections.emptyList();
> >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList()
> >>>>
> >>>> ;>
> >>>>
> >>>>     }
> >>>> }
> >>>>
> >>>> by doing this users can still "consolidate" the references of store
> >>>>
> >>>> names
> >>>>
> >>>> in a single place in the transform call, e.g.:
> >>>>
> >>>> public class MyTransformerSupplier<K, V, R> {
> >>>>     private String storeName;
> >>>>
> >>>>     public class MyTransformer<K, V, R> {
> >>>>
> >>>>        ....
> >>>>
> >>>>        init() {
> >>>>           store = context.getStateStore(storeName);
> >>>>        }
> >>>>     }
> >>>>
> >>>>     default List<String> stateStoreNames() {
> >>>>         return Collections.singletonList(storeName);
> >>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList()
> >>>>
> >>>> ;>
> >>>>
> >>>>     }
> >>>> }
> >>>>
> >>>> Basically, we move the parameters from the caller of `transform` to
> >>>>
> >>>> inside
> >>>>
> >>>> the TransformSuppliers. DSL implementations would not change much,
> >>>>
> >>>> simply
> >>>>
> >>>> calling `connectStateStore` by getting the list of names from the
> >>>>
> >>>> provided
> >>>>
> >>>> function.
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
> >>>>
> >>>> matth...@confluent.io
> >>>>
> >>>> wrote:
> >>>>
> >>>>
> >>>> Just a meta comment: do we really need to deprecate existing
> >>>> `transform()` etc methods?
> >>>>
> >>>> The last argument is a vararg, and thus, just keeping the existing
> >>>>
> >>>> API
> >>>>
> >>>> for this part seems to work too, allowing to implement both
> >>>>
> >>>> patterns?
> >>>>
> >>>> Also, instead of adding a default method, we could also add a new
> >>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder>
> >>>> stateStores()` -- users could implement `TransformerSupplier` and
> >>>> `StoreBuilderSupplier` at once; and for this case, we require that
> >>>>
> >>>> users
> >>>>
> >>>> don't provide store name in `transform()`.
> >>>>
> >>>> Similar, we could add an interface `StoreNameSupplier` with method
> >>>> `List<String> stateStores()`. This allows to "auto-wire" a
> >>>>
> >>>> transformer
> >>>>
> >>>> to existing stores (to avoid the issue to add the same store
> >>>>
> >>>> multiple
> >>>>
> >>>> times).
> >>>>
> >>>> Hence, for shared stores, there would be one "main" transformer
> >>>>
> >>>> that
> >>>>
> >>>> implements `StoreBuilderSupplier` and that must be added first to
> >>>>
> >>>> the
> >>>>
> >>>> topology. The other transformers would implement
> >>>>
> >>>> `StoreNameSupplier`
> >>>>
> >>>> and
> >>>>
> >>>> just connect to those stores.
> >>>>
> >>>> Another possibility to avoid the issue of adding the same stores
> >>>> multiple times would be, that the DSL always calls
> >>>>
> >>>> `addStateStore()`
> >>>>
> >>>> but
> >>>>
> >>>> catches a potential "store exists already" exception and falls
> >>>>
> >>>> back
> >>>>
> >>>> to
> >>>>
> >>>> `connectProcessorAndStateStore()` for this case. Thus, we would
> >>>>
> >>>> not
> >>>>
> >>>> need
> >>>>
> >>>> the `StoreNameSupplier` interface and the order in which
> >>>>
> >>>> transformers
> >>>>
> >>>> are added would not matter either. The only disadvantage I see,
> >>>>
> >>>> might
> >>>>
> >>>> be
> >>>>
> >>>> potential bugs about sharing state if two different stores are
> >>>>
> >>>> named
> >>>>
> >>>> the
> >>>>
> >>>> same by mistake (this would not be detected).
> >>>>
> >>>>
> >>>>
> >>>> Just some ideas I wanted to share. What do you think?
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/11/18 3:46 AM, Paul Whalen wrote:
> >>>>
> >>>> Ah yes of course, this was an oversight, I completely ignored the
> >>>>
> >>>> multiple
> >>>>
> >>>> processors sharing the same state store when writing up the KIP.
> >>>>
> >>>> Which
> >>>>
> >>>> is
> >>>>
> >>>> funny, because I've actually done this (different processors
> >>>>
> >>>> sharing
> >>>>
> >>>> state
> >>>>
> >>>> stores) a fair amount myself, and I've settled on a pattern
> >>>>
> >>>> where I
> >>>>
> >>>> group
> >>>>
> >>>> the Processors in an enclosing class, and that enclosing class
> >>>>
> >>>> handles
> >>>>
> >>>> as
> >>>>
> >>>> much as possible.  Here's a gist showing the rough structure,
> >>>>
> >>>> just
> >>>>
> >>>> for
> >>>>
> >>>> context:
> >>>>
> >>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
> >>>>
> >>>> . Note how it adds the stores to the topology, as well as
> >>>>
> >>>> providing a
> >>>>
> >>>> public method with the store names.
> >>>>
> >>>> I don't think my proposal completely conflicts with the multiple
> >>>>
> >>>> processors
> >>>>
> >>>> sharing state stores use case, since you can create a supplier
> >>>>
> >>>> that
> >>>>
> >>>> provides the store name you want, somewhat independently of your
> >>>>
> >>>> actual
> >>>>
> >>>> Processor logic.  The issue I do see though, is that
> >>>> topology.addStateStore() can only be called once for a given
> >>>>
> >>>> store.
> >>>>
> >>>> So
> >>>>
> >>>> for
> >>>>
> >>>> your example, if the there was a single TransformerSupplier that
> >>>>
> >>>> was
> >>>>
> >>>> passed
> >>>>
> >>>> into both transform() calls, "store1" would be added (under the
> >>>>
> >>>> hood)
> >>>>
> >>>> to
> >>>>
> >>>> the topology twice, which is no good.
> >>>>
> >>>> Perhaps this suggests that one of my alternatives on the KIP
> >>>>
> >>>> might
> >>>>
> >>>> be
> >>>>
> >>>> desirable: either not having the suppliers return StoreBuilders
> >>>>
> >>>> (just
> >>>>
> >>>> store
> >>>>
> >>>> names), or not deprecating the old methods that take "String...
> >>>> stateStoreNames". I'll have to think about it a bit.
> >>>>
> >>>> Paul
> >>>>
> >>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
> >>>>
> >>>> wangg...@gmail.com>
> >>>>
> >>>> wrote:
> >>>>
> >>>> Hello Paul,
> >>>>
> >>>> Thanks for the great writeup (very detailed and crystal
> >>>>
> >>>> motivation
> >>>>
> >>>> sections!).
> >>>>
> >>>> This is quite an interesting idea and I do like the API
> >>>>
> >>>> cleanness
> >>>>
> >>>> you
> >>>>
> >>>> proposed. The original motivation of letting StreamsTopology to
> >>>>
> >>>> add
> >>>>
> >>>> state
> >>>>
> >>>> stores though, is to allow different processors to share the
> >>>>
> >>>> state
> >>>>
> >>>> store.
> >>>>
> >>>> For example:
> >>>>
> >>>> builder.addStore("store1");
> >>>>
> >>>> // a path of stream transformations that leads to KStream
> >>>>
> >>>> stream1.
> >>>>
> >>>> stream1.transform(..., "store1");
> >>>>
> >>>> // another path that generates a KStream stream2.
> >>>> stream2.transform(..., "store1");
> >>>>
> >>>> Behind the scene, Streams will make sure stream1 / stream2
> >>>>
> >>>> transformations
> >>>>
> >>>> will always be grouped together as a single group of tasks, each
> >>>>
> >>>> of
> >>>>
> >>>> which
> >>>>
> >>>> will be executed by a single thread and hence there's no
> >>>>
> >>>> concurrency
> >>>>
> >>>> issues
> >>>>
> >>>> on accessing the store from different operators within the same
> >>>>
> >>>> task.
> >>>>
> >>>> I'm
> >>>>
> >>>> not sure how common this use case is, but I'd like to hear if
> >>>>
> >>>> you
> >>>>
> >>>> have
> >>>>
> >>>> any
> >>>>
> >>>> thoughts maintaining this since the current proposal seems
> >>>>
> >>>> exclude
> >>>>
> >>>> this
> >>>>
> >>>> possibility.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> <
> >> pgwha...@gmail.com>
> >>>>
> >>>> wrote:
> >>>>
> >>>> Here's KIP-401 for discussion, a minor Kafka Streams API change
> >>>>
> >>>> that
> >>>>
> >>>> I
> >>>>
> >>>> think could greatly increase the usability of the low-level
> >>>>
> >>>> processor
> >>>>
> >>>> API.
> >>>>
> >>>> I have some code written but will wait to see if there is buy
> >>>>
> >>>> in
> >>>>
> >>>> before
> >>>>
> >>>> going all out and creating a pull request.  It seems like most
> >>>>
> >>>> of
> >>>>
> >>>> the
> >>>>
> >>>> work
> >>>>
> >>>> would be in updating documentation and tests.
> >>>>
> >>>>
> >>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> >>>>
> >>>> Thanks!
> >>>> Paul
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to