Thank Paul!

I agree with all of that. If we think that the general design is good,
refactoring a PR if we want to pick a different name should not be too
much additional work (hopefully). Thus, if you want to open a WIP PR and
we use it to nail the open details, it might help to find a good conclusion.

>> 2) Default method vs new interface: 

This seems to be the hardest tradeoff. I see the point about
discoveability... Might be good to get input from others, which version
they would prefer.

Just to make clear, my suggestion from the last email was, that
`Transformer` etc does not extend the new interface. Instead, a user
that want to use this feature would need to implement both interfaces.

If `Transformer extends StoreProvider` (just picking a name here)
without default implementation existing code would break and thus it not
a an option because of breaking backward compatibility.


-Matthias

On 4/28/19 8:37 PM, Paul Whalen wrote:
> Great thoughts Matthias, thanks! I think we're all agreed that naming and
> documentation/education are the biggest hurdles for this KIP, and in light
> of that, I think it makes sense for me to just take a stab at a full
> fledged PR with documentation to convince us that it's possible to do it
> with enough clarity.
> 
> In response to your specific thoughts:
> 
> 1) StateStoreConnector as a name: Really good point about defining the
> difference between "adding" and "connecting."  Guozhang suggested
> StateStoreConnector which was definitely an improvement over my
> StateStoresSupplier, but I think you're right that we need to be careful to
> make it clear that it's really accomplishing both.  Thinking about it now,
> one problem with Connector is that the implementer of the interface is not
> really doing any connecting, it's providing/supplying the store that will
> be both added and connected.  StoreProvider seems reasonable to me and
> probably the best candidate at the moment, but it would be nice if the name
> could convey that it's providing the store specifically so the caller can
> add it to the topology and connect it to the associated transformer.
> 
> In general I think that really calling out what "adding" versus
> "connecting" is in the documentation will help make the entire purpose of
> this feature more clear to the user.
> 
> 2) Default method vs new interface: The choice of a default method was
> influenced by Guozhang's fear about API bloat/discoverability.  I can
> definitely see it both ways   Would the separate interface be a
> sub-interface of Processor/TransformerSupplier or standalone?  It seems
> like you're suggesting standalone and I think that's what I favor.  My only
> concern there is that the interface wouldn't actually be a type to any
> public API which sort of hurts discoverability.  You would have to read the
> javadocs for stream.process/transform() to discover that implementing the
> interface in addition to Processor/TransformerSupplier would add and
> connect the store for you.  But that added burden actually probably helps
> us in terms of making sure people don't mix and match, like you said.
> 
> 3) Returning null instead of empty: Seems fair to me.  I always worry about
> returning null when an empty collection can be used instead, but given that
> the library is the caller rather than the client I think your point makes
> sense here.
> 
> 4) Returning Set instead of Collection: Agreed, don't see why not to make
> it more specific.
> 
> Paul
> 
> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Hi, sorry for the long pause. Just trying to catch up here.
>>
>> I think it save to allow `addStateStore()` to be idempotent for the same
>> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's
>> not really possible to use the same `StoreBuilder` object to create
>> different stores.
>>
>> I also agree with the concern, that only allowing a single store (as
>> proposed by Ivan) might be too restrictive.
>>
>> Overall, the current KIP version LGTM. I don't have mayor concerns about
>> user education for this case, but I agree that we need to document this
>> clearly.
>>
>> Some further comments:
>>
>> (1) I am not sure if `StateStoreConnector` is the best name for the new
>> interface. Note, that there are two concepts about stores:
>>
>>  - adding a store: this makes the store available in the topology in
>> general (however, the store is still "dangling", and not used)
>>  - connecting a store: this allows a processor etc to use a store
>>
>> The new interface does both, but its name only indicates that second
>> part what might be confusing. It might be especially confusing because
>> we want to disallow to mix the exiting "manually add and connect"
>> pattern, with a new pattern to "auto add+connect". If the new interface
>> name indicates the connect part only, user might think they need to add
>> stores manually and can connect automatically.
>>
>> Unfortunately, I don't have a much better suggestion for a name either.
>> The only idea that came to my mind was `StoreProvider`: to me, a
>> provider is a "service" interface that does work for us, ie, it adds and
>> connects a store. Not sure if this is too subtle, if we consider that
>> there is already the `StoreSupplier` interface?
>>
>> But maybe somebody else might still have a good idea on how the improve
>> the name.
>>
>> In any case, I would suggest to shorten the name to `StoreConnector`
>> instead of `StateStoreConnector`, because we also have `StoreSupplier`
>> and `StoreBuilder`.
>>
>>
>>
>> (2) The KIP proposes to add the new interface to `ProcessorSupplier` etc
>> and to add a default implementation for the new method. Hence, user
>> would need to overwrite this default implementation to op-in to the
>> feature. I am wonder if it might be better to not add the new interface
>> to `ProcessorSupplier` etc and to just provide a new interface with no
>> default implementation. Users would opt-in by adding the interface
>> explicitly to their existing `ProcessorSupplier` implementation.
>> Overwriting a default method and getting different behavior seems to be
>> a little subtle to me, especially, because we don't want to allow to
>> mix-and-match the old and new approaches. Think: I only overwrite a
>> default method and my code breaks.
>>
>> Thoughts?
>>
>>
>>
>> (3) If we keep the current default implementation for the new method, I
>> am wondering if it should return `null` instead of an empty collection?
>> This might be saver to detect bugs in user code for which, per accident,
>> an empty collection could be returned.
>>
>>
>>
>> (4) Should the new method return a `Set` instead of a `Collection` to
>> indicate the semantics clearly (ie, returning the same `StoreBuilder`
>> multiple times is idempotent and one cannot add+connect to it twice).
>>
>>
>>
>> -Matthias
>>
>>
>>
>>
>> On 4/6/19 12:27 PM, Paul Whalen wrote:
>>> Ivan and Guozhang,
>>>
>>> Thanks for the thoughts!  Ivan's use case is definitely interesting.  The
>>> way I see it, if we can achieve the main goal of the KIP (allowing
>>> Processor/TransformerSuppliers to encapsulate their usage of state
>> stores),
>>> we will enable this kind of thing in "user space" very easily.
>>>
>>> I will say that I'm not totally sure that most use cases of transform()
>> use
>>> just one state store.  It's hard to know since I haven't seen many
>> examples
>>> in public, but my team's usages almost exclusively require multiple state
>>> stores.  We only reach for the low level processor API when we need that
>>> complexity, and it's somewhat hard to imagine many use cases that only
>> need
>>> one state store, since the high level DSL can usually accomplish those
>>> tasks.  The example Ivan presented for instance looks like a
>>> stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what sort of
>>> other usages you're imagining.
>>>
>>> That being said, perhaps the Processor API should really just be
>> considered
>>> a separate paradigm in Streams, not just a lower level that we reach to
>>> when necessary.  In which case it would be beneficial to make the simple
>>> use cases easier.  I've definitely talked about this with my own team -
>> if
>>> you're less familiar with the kind of functional style that the high
>> level
>>> DSL offers, it might be easier to "see" your state and interact with it
>>> directly.
>>>
>>> Anyway, I've updated the KIP to reflect my current PR with Guozhang's
>>> suggestions.  It seems like there is at least some interest in that on
>> its
>>> own and not a ton of pushback, so I think I will try to start a vote.
>>>
>>> Paul
>>>
>>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <iponoma...@mail.ru>
>> wrote:
>>>
>>>> Hi all!
>>>>
>>>> I was about to write another KIP, but found out that KIP-401 addresses
>>>> exactly the problem I faced. So let me jump into your discussion and ask
>>>> you to assess another idea.
>>>>
>>>> I fully agree with the KIP-401's motivation part. E. g in my project I
>> had
>>>> to invent a wrapper class that hides the details of KeyValueStore
>>>> management from business logic. Of course this should be done better in
>>>> KStreams API.
>>>>
>>>> But I was about to look at this problem from another side and propose a
>>>> simple alternative in high-level DSL, that will not fit all the cases,
>> but
>>>> most of them. Hence my idea does not exclude the Paul's proposal.
>>>>
>>>> What if we restrict ourselves to *only one* KeyValueStore and propose a
>>>> method that resembles  `aggregate` and `reduce` methods, like this:
>>>>
>>>> stream
>>>>    .map(...)
>>>>    .filter(...)
>>>>    .transform ((k, v, s)->{....}, Transformed.with(....))
>>>>
>>>> where
>>>> * k, v -- input key & value
>>>> * s -- a KeyValueStore provided as an argument
>>>> * return value of the lambda should be KeyValue.pair(...)
>>>> * Transformed.with... is a builder, used in order to define the
>>>> Transformer and KeyValueStore building parameters. Some of these
>> parameters
>>>> should be:
>>>> ** store's KeySerde,
>>>> ** store's ValueSerde,
>>>> ** whether the store is persistent or in-memory,
>>>> ** store's name -- optional parameter, the system should be able to
>> devise
>>>> the name of the store transparently for the user, if we don't want to
>>>> devise it ourselves/share the store between processors.
>>>> ** scheduled punctuation.
>>>>
>>>> Imagine we have a KStream<String, Integer>, and we need to calculate a
>>>> `derivative` stream, that is, a stream of 'deltas' of the provided
>> integer
>>>> values.
>>>>
>>>> This could be achieved as simple as
>>>>
>>>> stream.transform((key, value, stateStore) -> {
>>>>         int previousValue =
>>>> Optional.ofNullable(stateStore.get(key)).orElse(0);
>>>>         stateStore.put(key, value);
>>>>         return KeyValue.pair(key, value - previousValue);
>>>>         }
>>>>         //we do not need to bother with store name, punctuation etc.
>>>>         //may be even Serde part can be omitted, since we can inherit
>> the
>>>> serdes from stream by default
>>>>         , Transformed.with(Serdes.String(), Serdes.Integer())
>>>> }
>>>>
>>>> The hard part of it is that new `transform` method definition should be
>>>> parameterized by six type parameters:
>>>>
>>>> * input/output/KeyValueStore key type,
>>>> * input/output/KeyValueStore value type.
>>>>
>>>> However, it seems that all these types can be inferred from the provided
>>>> lambda and Transformed.with instances.
>>>>
>>>> What do you think about this?
>>>>
>>>> Regards,
>>>>
>>>> Ivan
>>>>
>>>>
>>>> 27.03.2019 20:45, Guozhang Wang пишет:
>>>>
>>>> Hello Paul,
>>>>
>>>> Thanks for the uploaded PR and the detailed description! I've made a
>> pass
>>>> on it and left some comments.
>>>>
>>>> Overall I think I agree with you that passing in the storebuilder
>> directly
>>>> that store name is more convienent as it does not require another
>>>> `addStore` call, but we just need to spend some more documentation
>> effort
>>>> on educating users about the two ways of connecting their stores. I'm
>>>> slightly concerned about this education curve but I can be convinced if
>>>> most people felt it is worthy.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> <
>> pgwha...@gmail.com> wrote:
>>>>
>>>>
>>>> I'd like to resurrect this discussion with a cursory, proof-of-concept
>>>> implementation of the KIP which combines many of our ideas:
>> https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
>>>> small as possible for now, just using it to convey the main ideas.  But
>>>> I'll separately address some of our earlier discussion:
>>>>
>>>>    - Will there be a new, separate interface for users to implement for
>> the
>>>>    new functionality? No, to hopefully keep things simple, all of the
>>>>    Processor/TransformerSupplier interfaces will just extend
>>>>    StateStoresSupplier, allowing users to opt in to this functionality
>> by
>>>>    overriding the default implementation that gives an empty list.
>>>>    - Will the interface allow users to specify the store name, or the
>>>>    entire StoreBuilder? The entire StoreBuilder, so the
>>>>    Processor/TransformerSupplier can completely encapsulate name and
>>>>    implementation of a state store if desired.
>>>>    - Will the old way of specifying store names alongside the supplier
>> when
>>>>    calling stream.process/transform() be deprecated? No, this is still a
>>>>    legitimate way to wire up Processors/Transformers and their stores.
>> But
>>>> I
>>>>    would recommend not allowing stream.process/transform() calls that
>> use
>>>> both
>>>>    store declaration mechanisms (this restriction is not in the proof of
>>>>    concept)
>>>>    - How will we handle adding the same state store to the topology
>>>>    multiple times because different Processor/TransformerSuppliers
>> declare
>>>> it?
>>>>    topology.addStateStore() will be slightly relaxed for convenience,
>> and
>>>> will
>>>>    allow adding the same StoreBuilder multiple times as long as the
>> exact
>>>> same
>>>>    StoreBuilder instance is being added for the same store name.  This
>>>> seems
>>>>    to prevent in practice the issue of accidentally making two state
>> stores
>>>>    one by adding with the same name.  For additional safety, if we
>> wanted
>>>> to
>>>>    (not in the proof of concept), we could allow for this relaxation
>> only
>>>> for
>>>>    internal callers of topology.addStateStore().
>>>>
>>>> So, in summary, the use cases look like:
>>>>
>>>>    - 1 transformer/processor that owns its store: Using the new
>>>>    StateStoresSupplier interface method to supply its StoreBuilders that
>>>> will
>>>>    be added to the topology automatically.
>>>>    - Multiple transformer/processors that share the same store: Either
>>>>
>>>>
>>>>    1. The old way: the StoreBuilder is defined "far away" from the
>>>>    Transformer/Processor implementations, and is added to the topology
>>>>    manually by the user
>>>>    2. The new way: the StoreBuilder is defined closer to the
>>>>    Transformer/Processor implementations, and the same instance is
>>>> returned by
>>>>    all Transformer/ProcessorSuppliers that need it
>>>>
>>>>
>>>> This makes the KIP wiki a bit stale; I'll update if we want to bring
>> this
>>>> design to a vote.
>>>>
>>>> Thanks!
>>>> Paul
>>>>
>>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> <
>> wangg...@gmail.com> wrote:
>>>>
>>>>
>>>> Matthias / Paul,
>>>>
>>>> The concern I had about introducing `StoreBuilderSupplier` is simply
>>>> because it is another XXSupplier to the public API, so I'd like to ask
>> if
>>>> we really have to add it :)
>>>>
>>>> The difference between encapsulating the store name and encapsulating
>> the
>>>> full state store builder is that, in the former:
>>>>
>>>> -----------
>>>>
>>>> String storeName = "store1";
>>>> builder.addStore(new MyStoreBuilder(storeName));
>>>> stream1.transform(new MyTransformerSupplier(storeName));   // following
>>>>
>>>> my
>>>>
>>>> proposal, that the store name can be passed in and used for both
>>>> `listStores` and in the `Transformer#init`; so the Transformer function
>>>> does not need to get the constant string name again.
>>>>
>>>>                          // one caveat to admit, is that
>>>> MyTransofmerSupplier logic may be just unique to `store1` so it cannot
>> be
>>>> reused with a different store name anyways.
>>>> -----------
>>>>
>>>> While in the latter:
>>>>
>>>> -----------
>>>>
>>>> stream1.transform(new MyTransformerSupplierForStore1);   // the name is
>>>> just indicating that we may have one such supplier for each store.
>>>>
>>>> -----------
>>>>
>>>> I understand the latter introduce more convenience from the API, but the
>>>> cost is that since we still cannot completely `builder.addStore`, but
>>>>
>>>> only
>>>>
>>>> reduce its semantic scope to shared state stores only,; hence users need
>>>>
>>>> to
>>>>
>>>> learn two ways of creating state stores for those two patterns.
>>>>
>>>> My argument is that more public APIs requires longer learning curve for
>>>> users, and introduces more usage patterns that may confuse users (the
>>>> proposal I had tries to replace one with another completely).
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> <
>> pgwha...@gmail.com> wrote:
>>>>
>>>>
>>>> Thanks for the great thoughts Matthias and Guozhang!
>>>>
>>>> If I'm not mistaken, Guozhang's suggestion is what my second
>>>>
>>>> alternative
>>>>
>>>> on
>>>>
>>>> the KIP is ("Have the added method on the Supplier interfaces only
>>>>
>>>> return
>>>>
>>>> store names, not builders").  I do think it would be a worthwhile
>>>>
>>>> usability
>>>>
>>>> improvement on its own, but to Matthias's point, it doesn't achieve the
>>>> full goal of completing encapsulating a state store and it's processor
>>>>
>>>> -
>>>>
>>>> it
>>>>
>>>> encapsulates the name, but not the StateStoreBuilder.
>>>>
>>>> I'm really intrigued by Matthias's idea that forgoes the default
>>>>
>>>> interface
>>>>
>>>> method I proposed.  Having smaller, separate interfaces is a powerful
>>>>
>>>> idea
>>>>
>>>> and I think a cleaner API than what I proposed.  The non-shared store
>>>>
>>>> use
>>>>
>>>> case is handled well here, and the shared store use case is possible,
>>>> though maybe still not as graceful as we would like (having to add the
>>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe too
>>>>
>>>> subtle
>>>>
>>>> to
>>>>
>>>> me).
>>>>
>>>> We're all agreed that one of the big problems with the shared store use
>>>> case is how to deal with adding the same store to the topology multiple
>>>> times.  Catching the "store already added" exception is risky.  Here's
>>>>
>>>> a
>>>>
>>>> maybe radical idea: change `topology.addStateStore()` to be idempotent
>>>>
>>>> for
>>>>
>>>> adding a given state store name and `StoreBuilder`.  In other words,
>>>> `addStateStore` would not throw the "store already added" exception if
>>>>
>>>> the
>>>>
>>>> `StoreBuilder` being added for a given name has the same identity as
>>>>
>>>> the
>>>>
>>>> one that has already been added.  Does this eliminate all the bugs
>>>>
>>>> we're
>>>>
>>>> worried about?  Thinking about it for a few minutes, it seems to
>>>>
>>>> eliminate
>>>>
>>>> most at least (would a user really use the exact same StoreBuilder when
>>>> they intend there to be two stores?).  It might make the API slightly
>>>> harder to use if a user isn't immediately aware of that subtlety, but a
>>>> good error message should ease the pain, and it would happen
>>>>
>>>> immediately
>>>>
>>>> during development.
>>>>
>>>> And with regards to Matthias's comment about whether we need to
>>>>
>>>> deprecate
>>>>
>>>> existing varargs transform methods - I don't think we need to, but it
>>>>
>>>> might
>>>>
>>>> be nice for there only to be one way to do things, assuming whatever we
>>>> come up with supports all existing use cases.  I don't feel strongly
>>>>
>>>> about
>>>>
>>>> this, but if we don't deprecate, I do think it's important to add
>>>>
>>>> checks
>>>>
>>>> that prevent users from trying to do the same thing in two different
>>>>
>>>> ways,
>>>>
>>>> as we've discussed.
>>>>
>>>> Paul
>>>>
>>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <matth...@confluent.io
>>>>
>>>> wrote:
>>>>
>>>>
>>>> Guozhang,
>>>>
>>>>
>>>> Regarding the last option to catch "store exist already" exception
>>>>
>>>> and
>>>>
>>>> fallback to connect stores, I'm a bit concerned it may be hiding
>>>>
>>>> actual
>>>>
>>>> user bugs.
>>>>
>>>> I agree with this concern. From my original email:
>>>>
>>>>
>>>> The only disadvantage I see, might be
>>>> potential bugs about sharing state if two different stores are
>>>>
>>>> named
>>>>
>>>> the
>>>>
>>>> same by mistake (this would not be detected).
>>>>
>>>> For your new proposal: I am not sure if it addresses Paul's original
>>>> idea -- I hope Paul can clarify. From my understanding, the idea was
>>>>
>>>> to
>>>>
>>>> encapsulate a store and its processor. As many stores are not shared,
>>>> this seems to be quite useful. Your proposal falls a little short to
>>>> support encapsulation for none-shared stores.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 12/15/18 1:40 AM, Guozhang Wang wrote:
>>>>
>>>> Matthias,
>>>>
>>>> Thanks for your feedbacks.
>>>>
>>>> Regarding the last option to catch "store exist already" exception
>>>>
>>>> and
>>>>
>>>> fallback to connect stores, I'm a bit concerned it may be hiding
>>>>
>>>> actual
>>>>
>>>> user bugs.
>>>>
>>>> Thinking about Paul's proposal and your suggestion again, I'd like
>>>>
>>>> to
>>>>
>>>> propose another alternative somewhere in the middle of your
>>>>
>>>> approaches,
>>>>
>>>> i.e. we still let users to create sharable state stores via
>>>> `addStateStore`, and we allow the TransformerSupplier to return a
>>>>
>>>> list
>>>>
>>>> of
>>>>
>>>> state stores that it needs, i.e.:
>>>>
>>>> public interface TransformerSupplier<K, V, R> {
>>>>     Transformer<K, V, R> get();
>>>>     default List<String> stateStoreNames() {
>>>>         return Collections.emptyList();
>>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>>>>
>>>> ;>
>>>>
>>>>     }
>>>> }
>>>>
>>>> by doing this users can still "consolidate" the references of store
>>>>
>>>> names
>>>>
>>>> in a single place in the transform call, e.g.:
>>>>
>>>> public class MyTransformerSupplier<K, V, R> {
>>>>     private String storeName;
>>>>
>>>>     public class MyTransformer<K, V, R> {
>>>>
>>>>        ....
>>>>
>>>>        init() {
>>>>           store = context.getStateStore(storeName);
>>>>        }
>>>>     }
>>>>
>>>>     default List<String> stateStoreNames() {
>>>>         return Collections.singletonList(storeName);
>>>> <https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>>>>
>>>> ;>
>>>>
>>>>     }
>>>> }
>>>>
>>>> Basically, we move the parameters from the caller of `transform` to
>>>>
>>>> inside
>>>>
>>>> the TransformSuppliers. DSL implementations would not change much,
>>>>
>>>> simply
>>>>
>>>> calling `connectStateStore` by getting the list of names from the
>>>>
>>>> provided
>>>>
>>>> function.
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
>>>>
>>>> matth...@confluent.io
>>>>
>>>> wrote:
>>>>
>>>>
>>>> Just a meta comment: do we really need to deprecate existing
>>>> `transform()` etc methods?
>>>>
>>>> The last argument is a vararg, and thus, just keeping the existing
>>>>
>>>> API
>>>>
>>>> for this part seems to work too, allowing to implement both
>>>>
>>>> patterns?
>>>>
>>>> Also, instead of adding a default method, we could also add a new
>>>> interface `StoreBuilderSupplier` with method `List<StoreBuilder>
>>>> stateStores()` -- users could implement `TransformerSupplier` and
>>>> `StoreBuilderSupplier` at once; and for this case, we require that
>>>>
>>>> users
>>>>
>>>> don't provide store name in `transform()`.
>>>>
>>>> Similar, we could add an interface `StoreNameSupplier` with method
>>>> `List<String> stateStores()`. This allows to "auto-wire" a
>>>>
>>>> transformer
>>>>
>>>> to existing stores (to avoid the issue to add the same store
>>>>
>>>> multiple
>>>>
>>>> times).
>>>>
>>>> Hence, for shared stores, there would be one "main" transformer
>>>>
>>>> that
>>>>
>>>> implements `StoreBuilderSupplier` and that must be added first to
>>>>
>>>> the
>>>>
>>>> topology. The other transformers would implement
>>>>
>>>> `StoreNameSupplier`
>>>>
>>>> and
>>>>
>>>> just connect to those stores.
>>>>
>>>> Another possibility to avoid the issue of adding the same stores
>>>> multiple times would be, that the DSL always calls
>>>>
>>>> `addStateStore()`
>>>>
>>>> but
>>>>
>>>> catches a potential "store exists already" exception and falls
>>>>
>>>> back
>>>>
>>>> to
>>>>
>>>> `connectProcessorAndStateStore()` for this case. Thus, we would
>>>>
>>>> not
>>>>
>>>> need
>>>>
>>>> the `StoreNameSupplier` interface and the order in which
>>>>
>>>> transformers
>>>>
>>>> are added would not matter either. The only disadvantage I see,
>>>>
>>>> might
>>>>
>>>> be
>>>>
>>>> potential bugs about sharing state if two different stores are
>>>>
>>>> named
>>>>
>>>> the
>>>>
>>>> same by mistake (this would not be detected).
>>>>
>>>>
>>>>
>>>> Just some ideas I wanted to share. What do you think?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 12/11/18 3:46 AM, Paul Whalen wrote:
>>>>
>>>> Ah yes of course, this was an oversight, I completely ignored the
>>>>
>>>> multiple
>>>>
>>>> processors sharing the same state store when writing up the KIP.
>>>>
>>>> Which
>>>>
>>>> is
>>>>
>>>> funny, because I've actually done this (different processors
>>>>
>>>> sharing
>>>>
>>>> state
>>>>
>>>> stores) a fair amount myself, and I've settled on a pattern
>>>>
>>>> where I
>>>>
>>>> group
>>>>
>>>> the Processors in an enclosing class, and that enclosing class
>>>>
>>>> handles
>>>>
>>>> as
>>>>
>>>> much as possible.  Here's a gist showing the rough structure,
>>>>
>>>> just
>>>>
>>>> for
>>>>
>>>> context:
>>>>
>>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
>>>>
>>>> . Note how it adds the stores to the topology, as well as
>>>>
>>>> providing a
>>>>
>>>> public method with the store names.
>>>>
>>>> I don't think my proposal completely conflicts with the multiple
>>>>
>>>> processors
>>>>
>>>> sharing state stores use case, since you can create a supplier
>>>>
>>>> that
>>>>
>>>> provides the store name you want, somewhat independently of your
>>>>
>>>> actual
>>>>
>>>> Processor logic.  The issue I do see though, is that
>>>> topology.addStateStore() can only be called once for a given
>>>>
>>>> store.
>>>>
>>>> So
>>>>
>>>> for
>>>>
>>>> your example, if the there was a single TransformerSupplier that
>>>>
>>>> was
>>>>
>>>> passed
>>>>
>>>> into both transform() calls, "store1" would be added (under the
>>>>
>>>> hood)
>>>>
>>>> to
>>>>
>>>> the topology twice, which is no good.
>>>>
>>>> Perhaps this suggests that one of my alternatives on the KIP
>>>>
>>>> might
>>>>
>>>> be
>>>>
>>>> desirable: either not having the suppliers return StoreBuilders
>>>>
>>>> (just
>>>>
>>>> store
>>>>
>>>> names), or not deprecating the old methods that take "String...
>>>> stateStoreNames". I'll have to think about it a bit.
>>>>
>>>> Paul
>>>>
>>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
>>>>
>>>> wangg...@gmail.com>
>>>>
>>>> wrote:
>>>>
>>>> Hello Paul,
>>>>
>>>> Thanks for the great writeup (very detailed and crystal
>>>>
>>>> motivation
>>>>
>>>> sections!).
>>>>
>>>> This is quite an interesting idea and I do like the API
>>>>
>>>> cleanness
>>>>
>>>> you
>>>>
>>>> proposed. The original motivation of letting StreamsTopology to
>>>>
>>>> add
>>>>
>>>> state
>>>>
>>>> stores though, is to allow different processors to share the
>>>>
>>>> state
>>>>
>>>> store.
>>>>
>>>> For example:
>>>>
>>>> builder.addStore("store1");
>>>>
>>>> // a path of stream transformations that leads to KStream
>>>>
>>>> stream1.
>>>>
>>>> stream1.transform(..., "store1");
>>>>
>>>> // another path that generates a KStream stream2.
>>>> stream2.transform(..., "store1");
>>>>
>>>> Behind the scene, Streams will make sure stream1 / stream2
>>>>
>>>> transformations
>>>>
>>>> will always be grouped together as a single group of tasks, each
>>>>
>>>> of
>>>>
>>>> which
>>>>
>>>> will be executed by a single thread and hence there's no
>>>>
>>>> concurrency
>>>>
>>>> issues
>>>>
>>>> on accessing the store from different operators within the same
>>>>
>>>> task.
>>>>
>>>> I'm
>>>>
>>>> not sure how common this use case is, but I'd like to hear if
>>>>
>>>> you
>>>>
>>>> have
>>>>
>>>> any
>>>>
>>>> thoughts maintaining this since the current proposal seems
>>>>
>>>> exclude
>>>>
>>>> this
>>>>
>>>> possibility.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> <
>> pgwha...@gmail.com>
>>>>
>>>> wrote:
>>>>
>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change
>>>>
>>>> that
>>>>
>>>> I
>>>>
>>>> think could greatly increase the usability of the low-level
>>>>
>>>> processor
>>>>
>>>> API.
>>>>
>>>> I have some code written but will wait to see if there is buy
>>>>
>>>> in
>>>>
>>>> before
>>>>
>>>> going all out and creating a pull request.  It seems like most
>>>>
>>>> of
>>>>
>>>> the
>>>>
>>>> work
>>>>
>>>> would be in updating documentation and tests.
>>>>
>>>>
>>>>
>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>>
>>>> Thanks!
>>>> Paul
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to