I am not sure if I full understand, hence, I try to rephrase:

> I can't think of an example that would require both ways, or would
> even be more readable using both ways.

Example:

There are two processor A and B, and one store S that both need to
access and one store S_b that only B needs to access:

If we don't allow to mix both approaches, it would be required to write
the following code:

  Topology t = new Topology();
  t.addProcessor("A", ...); // does not add any store
  t.addProceccor("B", ...); // does not add any store
  t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
  t.addStateStore(..., "B"); // adds S_b and connect it to B

// DSL example:

  StreamsBiulder b = new StreamsBuilder();
  b.addStateStore() // adds S
  b.addStateStore() // adds S_b
  stream1.process(..., "S") // add A and connect S
  stream2.process(..., "S", "S_b") // add B and connect S and S_b


If we allow to mixes both approaches, the code could be (simplified to):

  Topology t = new Topology();
  t.addProcessor("A", ...); // does not add any store
  t.addProceccor("B", ...); // adds/connects S_b implicitly
  t.addStateStore(..., "A", "B"); // adds S and connect it to A and B

// DSL example

  StreamsBiulder b = new StreamsBuilder();
  b.addStateStore() // adds S
  stream1.process(..., "S") // add A and connect S
  stream2.process(..., "S") // add B and connect S; adds/connects S_b
implicitly

The fact that B has a "private store" could be encapsulated and I don't
see why this would be bad?

> If you can
> do both ways, the actual full set of state stores being connected could be
> in wildly different places in the code, which could create confusion.

Ie, I don't see why the second version would be confusing, or why the
first version would be more readable (I don't argue it's less readable
either though; I think both are equally readable)?



Or do you argue that we should allow the following:

> Shared stores can be passed from
> the outside in an anonymous ProcessorSupplier if desired, making it
> effectively the same as passing the stateStoreNames var args

  Topology t = new Topology();
  t.addProcessor("A", ...); // adds/connects S implicitly
  t.addProceccor("B", ...); // adds/connects S and S_b implicitly

// DSL example

  StreamsBiulder b = new StreamsBuilder();
  stream1.process(...) // add A and add/connect S implicitly
  stream2.process(...) // add B and add/connect S and S_b implicitly

For this case, the second implicit adding of S would require to return
the same `StoreBuilder` instance to make it idempotent what seems hard
to achieve, because both `ProcessorSuppliers` now have a cross
dependency to us the same object.

Hence, I don't think this would be a good approach.


Also, because we require for a unique store name to always pass the same
`StoreBuilder` instance, we have actually a good protection against user
bug that may add two stores with the same name but different builders twice.


I also do not feel super strong about it, but see some advantages to
allow the mixed approach, and don't see disadvantages. Would be good to
get input from others, too.



-Matthias


On 8/7/19 7:29 PM, Paul Whalen wrote:
> My thinking on restricting the API to enforce only one way of connecting
> stores would make it more simple to use and end up with more readable
> code.  I can't think of an example that would require both ways, or would
> even be more readable using both ways.  Shared stores can be passed from
> the outside in an anonymous ProcessorSupplier if desired, making it
> effectively the same as passing the stateStoreNames var args.  If you can
> do both ways, the actual full set of state stores being connected could be
> in wildly different places in the code, which could create confusion.  I
> personally can't imagine a case in which that would be useful.
> 
> All that being said, I don't feel terribly strongly about it.  I'm just
> trying to make the API as straightforward as possible.  Admittedly a
> runtime check doesn't make for a great API, but I see it more as an
> opportunity to educate the user to make it clear that "connecting" a state
> store is a thing that can be done in two different ways, but there is no
> reason to mix both.  If it seems like there's a compelling reason to mix
> them then I would abandon the idea in a heartbeat.
> 
> Paul
> 
> On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Sorry for the long silence on this KIP Paul! I guess the 2.3 release
>> distracted us somewhat.
>>
>> Overall, I am +1.
>>
>> With regard to John's point about owned vs shared state stores, I think
>> it describe a valid use case, and throwing an exception if people want
>> to mix both features might be too restrictive?
>>
>> We could of course later relax the restriction, but atm I am not sure
>> what the main argument for adding the restriction is?
>>
>> (a) In the current API, one could connect the same store multiple times
>> to the same processor without getting an exception, because the
>> operation is idempotent.
>>
>> (b) The KIP also suggest to relax the current restriction to add the
>> same store twice, as long as store name and `StoreBuilder` instance are
>> the same, because it's an idempotent (hence, safe) operation too.
>>
>> Because we have already (a) and (b) and consider both as safe, it seems
>> we could also treat the case of mixing both patterns as idempotent and
>> hence safe. And if we do this, we enable to mix both patterns for
>> different stores implicitly.
>>
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/17/19 2:31 PM, John Roesler wrote:
>>> Hey, all,
>>>
>>> Sorry I'm late to the party. I meant to read into this KIP before, but
>>> didn't get around to it. I was just reminded when Paul mentioned it in
>>> a different thread. Please feel free to bump a discussion any time it
>>> stalls!
>>>
>>> I've just read through the whole discussion so far, and, to echo the
>>> earlier sentiments, the motivation seems very clear. I remember how
>>> hard it was to figure out how to actually wire up a stateful processor
>>> properly the first couple of times. Not a very good user experience.
>>>
>>> I looked over the whole conversation to date, as well as the KIP and
>>> the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The
>>> current approach looks good to me. I was concerned about the "cheat
>>> codes"-style mixin interface. Discoverability would have been a
>>> problem, and it's also not a very normal pattern for Java APIs. It
>>> actually looks a little more like something you'd do with an
>>> annotation.
>>>
>>> So the current approach seems good:
>>> * The new interface with a default to return `null` is effectively
>>> shipping the feature flagged "off" (which is nice and safe)
>>> * Shared stores are "supported" the same way they always have been, by
>>> connecting them externally. This makes sense, since those stores
>>> aren't "owned" by any of the connected processors.
>>> * Processors that do own their stores can configure them in the same
>>> file they use them, which decreases the probability of cast exceptions
>>> when they get the stores from the context.
>>> * Stateful processors that own their stores are available for one-shot
>>> definition of the stores and the processor all in the same file (this
>>> is the main point of the KIP)
>>>
>>> The runtime check that stores can't be both defined in the processor
>>> and referenced by name might be a little restrictive (since we already
>>> have the restriction that same-name stores can't be registered), but
>>> it would also be easy to remove it later. I'm just thinking that if I
>>> have a processor that owns one store and shares another, it would be
>>> pretty obvious how to hook it up in the proposed API, except for that
>>> check.
>>>
>>> One last thought, regarding the all-important interface name: If you
>>> wanted to indicate more that the stores are available for Streams to
>>> connect, rather than that they are already connected, you could call
>>> it ConnectableStoreProvider (similar to AutoCloseable).
>>>
>>> I just thought I'd summarize the current state, since it's been a
>>> while and no one has voted yet. I'll go ahead and vote now on the
>>> voting thread, since I'm +1 on the current proposal.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Mon, May 27, 2019 at 1:59 PM Paul Whalen <pgwha...@gmail.com> wrote:
>>>>
>>>> It wasn't much of a lift changing option B to work for option C, so I
>>>> closed that PR and made a new one, which should be identical to the KIP
>>>> right now: https://github.com/apache/kafka/pull/6824.  There are a few
>>>> todos still which I will hold off until the KIP is accepted.
>>>>
>>>> I created a voting thread about a month ago, so I'll bump that now that
>>>> we're nearly there.
>>>>
>>>> Paul
>>>>
>>>> On Sun, May 26, 2019 at 2:21 PM Paul Whalen <pgwha...@gmail.com> wrote:
>>>>
>>>>> Per Matthias's suggestion from a while ago, I actually implemented a
>> good
>>>>> amount of option B to get a sense of the user experience and
>> documentation
>>>>> requirements.  For a few reasons mentioned below, I think it's not my
>>>>> favorite option, and I prefer option C.  But since I did the work and
>> it
>>>>> can help discussion, I may as well share:
>>>>> https://github.com/apache/kafka/pull/6821.
>>>>>
>>>>> Things I learned along the way implementing Option B:
>>>>>  - For the name of the interface, I like ConnectedStoreProvider.  It
>> isn't
>>>>> perfect but it seems to capture the general gist without being overly
>>>>> verbose.  I get that from a strict standpoint it's not "providing
>> connected
>>>>> stores" but is instead "providing stores to be connected," but I think
>> that
>>>>> in context and with documentation, the risk of someone being confused
>> by
>>>>> that is low.
>>>>>  - I definitely felt the discoverability issue while trying to write
>> clear
>>>>> documentation; you really have to make sure to connect the dots for the
>>>>> user when the interface isn't connected to anything.
>>>>>  - Another problem with a separate interface found while writing
>>>>> tests/examples: defining a ProcessorSupplier that also implements
>>>>> ConnectedStoreProvider cannot be done anonymously, since you can't
>> define
>>>>> an anonymous class in Java that implements multiple interfaces.  I
>> actually
>>>>> consider this a fairly major usability issue - it means a user always
>> has
>>>>> to have a custom class rather than doing it inline.  We could provide
>> an
>>>>> abstract class that implements the two, but at that point, we're not
>> that
>>>>> far from option A or C anyway.
>>>>>
>>>>> I updated the KIP with my current thinking, which as mentioned is
>>>>> Matthias's option C.  Once again for clarity, that *is not* what is in
>> the
>>>>> linked pull request.  The current KIP is my proposal.
>>>>>
>>>>> Thanks everyone for the input!
>>>>>
>>>>> P.S.  What do folks use to edit the HTML documentation, e.g.
>>>>> processor-api.html?  I looked at doing it by hand it but it kind of
>> looked
>>>>> like agony with all the small tags required for formatting code, so I'm
>>>>> sort of assuming there's tooling for it.
>>>>>
>>>>> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> I think the discussion mixed approaches a little bit, hence, let me
>>>>>> rephrase my understanding:
>>>>>>
>>>>>>
>>>>>> A) add new method with default implementation to `ProcessorSupplier`:
>>>>>>
>>>>>> For this case, we don't add a new interface, but only add a new method
>>>>>> to `ProcessorSupplier` -- to keep backward compatibility, we need to
>> add
>>>>>> a default implementation. Users opt into the new feature by
>> overwriting
>>>>>> the default implementation.
>>>>>>
>>>>>>
>>>>>> B) We add a new interface with new method:
>>>>>>
>>>>>> For this case, `ProcessorSupplier` interface is not changed and it
>> does
>>>>>> also _not_ extend the new interface. Because `ProcessorSupplier` is
>> not
>>>>>> changed, it's naturally backward compatible. Users opt into the new
>>>>>> feature, by adding the new interface to their ProcessorSupplier
>>>>>> implementation and they need to implement the new method because there
>>>>>> is no default implementation. Kafka Streams can use `instanceof` to
>>>>>> detect if the new interface is used or not and thus, to the right
>> thing.
>>>>>>
>>>>>>
>>>>>> What was also discussed is a mix of both:
>>>>>>
>>>>>> C) We add a new interface with new method and let `ProcessorSupplier`
>>>>>> extend the new interface:
>>>>>>
>>>>>> Here, we need to add a default implementation to preserve backward
>>>>>> compatibility. Similar to (A), users opt into the feature by
>> overwriting
>>>>>> the default implementation.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Option (C) is the same as (A) from a user point of view because a user
>>>>>> won't care about the new interface. It only makes a difference for our
>>>>>> code base, as we can share the default implementation of the new
>> method
>>>>>> This is only a small gain, as the implementation is trivial but also a
>>>>>> small drawback as we add new public interface that is useless to the
>>>>>> user because the user would never implement the interface directly.
>>>>>>
>>>>>>
>>>>>>
>>>>>> For (A/C), it might be simpler for users to detect the feature. For
>> (B),
>>>>>> we have the advantage that users must implement the method if they use
>>>>>> the new interface.
>>>>>>
>>>>>> Overall, it seems that (A) might be the best choice because it makes
>> the
>>>>>> feature easier discoverable and does not add a "useless" interface. If
>>>>>> you want to go with (C) to share the default implementation code,
>> that's
>>>>>> also fine with me. I am convinced now (even if I brought it up), that
>>>>>> (B) might be not optimal because feature discoverability seems to be
>>>>>> important.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> About `null` vs `emptyList`: I still tend to like `null` better but
>> it's
>>>>>> really a detail and not too important. Note, that the question only
>>>>>> arises for (A/C), but not for (B) because for (B) we don't need a
>>>>>> default implementation.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> @Paul: It's unclear to me atm what your final proposal is because you
>>>>>> mentioned that you might want to rename `StateStoreConnector`? It's
>> also
>>>>>> unclear to me atm, if you prefer (A), (B), or (C).
>>>>>>
>>>>>> Maybe you can update the KIP if necessary and clearly state what you
>>>>>> final proposal is. Beside this, it seems we can move to a VOTE?
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 5/2/19 3:01 PM, Bruno Cadonna wrote:
>>>>>>> Hi Paul,
>>>>>>>
>>>>>>> I will try to express myself a bit clearer.
>>>>>>>
>>>>>>> Ad 1)
>>>>>>> My assumption is that if `StateStoreConnector#stateStores()` returns
>>>>>> `null`
>>>>>>> Kafka Streams will throw an NPE because on purpose no null check is
>>>>>>> performed before the loop that calls
>> `StreamsBuilder#addStateStore()`.
>>>>>> When
>>>>>>> the user finally understands the cause of the NPE, she knows that she
>>>>>> has
>>>>>>> to override `StateStoreConnector#stateStores()` in her
>> implementation.
>>>>>> My
>>>>>>> question was, why let the user discover that she has to overwrite the
>>>>>>> method at runtime if you could not provide a default implementation
>> for
>>>>>>> `StateStoreConnector#stateStores()` and let the compiler tell the
>> user
>>>>>> the
>>>>>>> need to overwrite the method. Not providing a default implementation
>>>>>>> without separating the interfaces implies not being
>> backward-compatible.
>>>>>>> That means, if we choose to not provide a default implementation and
>> let
>>>>>>> the compiler signal the necessity to override the method, we have to
>>>>>>> separate the interfaces in any case.
>>>>>>>
>>>>>>> Ad 2)
>>>>>>> If you check for `null` or empty list in `process` and do not call
>>>>>>> `addStateStores` in those cases, the advantage of returning `null`
>> to be
>>>>>>> saver to detect bugs as mentioned by Matthias would be lost. But
>> maybe
>>>>>> I am
>>>>>>> missing something here.
>>>>>>>
>>>>>>> Best,
>>>>>>> Bruno
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com>
>> wrote:
>>>>>>>
>>>>>>>> I definitely don't mind anyone jumping, Bruno, thanks for the
>> comments!
>>>>>>>>
>>>>>>>> 1) I'm not totally sure I'm clear on your point, but I think we're
>> on
>>>>>> the
>>>>>>>> same page - if we're adding a method to the XSupplier interfaces (by
>>>>>> making
>>>>>>>> them inherit from a super interface StateStoreConnector) then we
>>>>>> definitely
>>>>>>>> need a default implementation to maintain compatibility.  Whether
>> the
>>>>>>>> default implementation returns null or an empty list is somewhat of
>> a
>>>>>>>> detail.
>>>>>>>>
>>>>>>>> 2) If stream.process() sees that StateStoreConnector#stateStores()
>>>>>> returns
>>>>>>>> either null or an empty list, it would handle that case specifically
>>>>>> and
>>>>>>>> not try to call addStateStore at all.  Or is this not what you're
>>>>>> asking?
>>>>>>>>
>>>>>>>> Separately, I'm still hacking away at the details of the PR and will
>>>>>>>> continue to get something into a discussable state, but I'll share
>> some
>>>>>>>> thoughts I've run into.
>>>>>>>>
>>>>>>>> A) I'm tentatively going the separate interface route (Matthias's
>>>>>>>> suggestion) and naming it ConnectedStoreProvider.  Still don't love
>> the
>>>>>>>> name, but there's something nice about the name indicating *why*
>> this
>>>>>> thing
>>>>>>>> is providing the store, not just that it is providing it.
>>>>>>>>
>>>>>>>> B) It has occurred to me that topology.addProcessor() could also
>>>>>> recognize
>>>>>>>> if ProcessorSupplier implements ConnectedStoreProvider and add and
>>>>>> connect
>>>>>>>> stores appropriately.  This isn't in the KIP and I think the
>> value-add
>>>>>> is
>>>>>>>> lower (if you're reaching that low level, surely the "auto
>> add/connect
>>>>>>>> store" isn't too important to you), but I think it would be a
>>>>>> confusing if
>>>>>>>> it didn't, and I don't see any real downside.
>>>>>>>>
>>>>>>>> Paul
>>>>>>>>
>>>>>>>> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <br...@confluent.io>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> @Paul: Thank you for the KIP!
>>>>>>>>>
>>>>>>>>> I hope you do not mind that I jump in.
>>>>>>>>>
>>>>>>>>> I have the following comments:
>>>>>>>>>
>>>>>>>>> 1) `null` vs empty list in the default implementation
>>>>>>>>> IIUC, returning `null` in the default implementation should
>> basically
>>>>>>>>> signal that the method `stateStores` was not overridden. Why then
>>>>>>>> provide a
>>>>>>>>> default implementation in the first place? Without default
>>>>>> implementation
>>>>>>>>> you would discover the missing implementation already at
>> compile-time
>>>>>> and
>>>>>>>>> not only at runtime. If you decide not to provide a default
>>>>>>>> implementation,
>>>>>>>>> `XSupplier extends StateStoreConnector` would break existing code
>> as
>>>>>>>>> Matthias has already pointed out.
>>>>>>>>>
>>>>>>>>> 2) `process` method adding the StoreBuilders to the topology
>>>>>>>>> If the default implementation returned `null` and `XSupplier
>> extends
>>>>>>>>> StateStoreConnector`, then existing code would break, because
>>>>>>>>> `StreamsBuilder#addStateStore()` would throw a NPE.
>>>>>>>>>
>>>>>>>>> +1 for opening a WIP PR
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Bruno
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thank Paul!
>>>>>>>>>>
>>>>>>>>>> I agree with all of that. If we think that the general design is
>>>>>> good,
>>>>>>>>>> refactoring a PR if we want to pick a different name should not be
>>>>>> too
>>>>>>>>>> much additional work (hopefully). Thus, if you want to open a WIP
>> PR
>>>>>>>> and
>>>>>>>>>> we use it to nail the open details, it might help to find a good
>>>>>>>>>> conclusion.
>>>>>>>>>>
>>>>>>>>>>>> 2) Default method vs new interface:
>>>>>>>>>>
>>>>>>>>>> This seems to be the hardest tradeoff. I see the point about
>>>>>>>>>> discoveability... Might be good to get input from others, which
>>>>>> version
>>>>>>>>>> they would prefer.
>>>>>>>>>>
>>>>>>>>>> Just to make clear, my suggestion from the last email was, that
>>>>>>>>>> `Transformer` etc does not extend the new interface. Instead, a
>> user
>>>>>>>>>> that want to use this feature would need to implement both
>>>>>> interfaces.
>>>>>>>>>>
>>>>>>>>>> If `Transformer extends StoreProvider` (just picking a name here)
>>>>>>>>>> without default implementation existing code would break and thus
>> it
>>>>>>>> not
>>>>>>>>>> a an option because of breaking backward compatibility.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 4/28/19 8:37 PM, Paul Whalen wrote:
>>>>>>>>>>> Great thoughts Matthias, thanks! I think we're all agreed that
>>>>>> naming
>>>>>>>>> and
>>>>>>>>>>> documentation/education are the biggest hurdles for this KIP,
>> and in
>>>>>>>>>> light
>>>>>>>>>>> of that, I think it makes sense for me to just take a stab at a
>> full
>>>>>>>>>>> fledged PR with documentation to convince us that it's possible
>> to
>>>>>> do
>>>>>>>>> it
>>>>>>>>>>> with enough clarity.
>>>>>>>>>>>
>>>>>>>>>>> In response to your specific thoughts:
>>>>>>>>>>>
>>>>>>>>>>> 1) StateStoreConnector as a name: Really good point about
>> defining
>>>>>>>> the
>>>>>>>>>>> difference between "adding" and "connecting."  Guozhang suggested
>>>>>>>>>>> StateStoreConnector which was definitely an improvement over my
>>>>>>>>>>> StateStoresSupplier, but I think you're right that we need to be
>>>>>>>>> careful
>>>>>>>>>> to
>>>>>>>>>>> make it clear that it's really accomplishing both.  Thinking
>> about
>>>>>> it
>>>>>>>>>> now,
>>>>>>>>>>> one problem with Connector is that the implementer of the
>> interface
>>>>>>>> is
>>>>>>>>>> not
>>>>>>>>>>> really doing any connecting, it's providing/supplying the store
>> that
>>>>>>>>> will
>>>>>>>>>>> be both added and connected.  StoreProvider seems reasonable to
>> me
>>>>>>>> and
>>>>>>>>>>> probably the best candidate at the moment, but it would be nice
>> if
>>>>>>>> the
>>>>>>>>>> name
>>>>>>>>>>> could convey that it's providing the store specifically so the
>>>>>> caller
>>>>>>>>> can
>>>>>>>>>>> add it to the topology and connect it to the associated
>> transformer.
>>>>>>>>>>>
>>>>>>>>>>> In general I think that really calling out what "adding" versus
>>>>>>>>>>> "connecting" is in the documentation will help make the entire
>>>>>>>> purpose
>>>>>>>>> of
>>>>>>>>>>> this feature more clear to the user.
>>>>>>>>>>>
>>>>>>>>>>> 2) Default method vs new interface: The choice of a default
>> method
>>>>>>>> was
>>>>>>>>>>> influenced by Guozhang's fear about API bloat/discoverability.  I
>>>>>> can
>>>>>>>>>>> definitely see it both ways   Would the separate interface be a
>>>>>>>>>>> sub-interface of Processor/TransformerSupplier or standalone?  It
>>>>>>>> seems
>>>>>>>>>>> like you're suggesting standalone and I think that's what I
>> favor.
>>>>>>>> My
>>>>>>>>>> only
>>>>>>>>>>> concern there is that the interface wouldn't actually be a type
>> to
>>>>>>>> any
>>>>>>>>>>> public API which sort of hurts discoverability.  You would have
>> to
>>>>>>>> read
>>>>>>>>>> the
>>>>>>>>>>> javadocs for stream.process/transform() to discover that
>>>>>> implementing
>>>>>>>>> the
>>>>>>>>>>> interface in addition to Processor/TransformerSupplier would add
>> and
>>>>>>>>>>> connect the store for you.  But that added burden actually
>> probably
>>>>>>>>> helps
>>>>>>>>>>> us in terms of making sure people don't mix and match, like you
>>>>>> said.
>>>>>>>>>>>
>>>>>>>>>>> 3) Returning null instead of empty: Seems fair to me.  I always
>>>>>> worry
>>>>>>>>>> about
>>>>>>>>>>> returning null when an empty collection can be used instead, but
>>>>>>>> given
>>>>>>>>>> that
>>>>>>>>>>> the library is the caller rather than the client I think your
>> point
>>>>>>>>> makes
>>>>>>>>>>> sense here.
>>>>>>>>>>>
>>>>>>>>>>> 4) Returning Set instead of Collection: Agreed, don't see why
>> not to
>>>>>>>>> make
>>>>>>>>>>> it more specific.
>>>>>>>>>>>
>>>>>>>>>>> Paul
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <
>>>>>>>> matth...@confluent.io
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi, sorry for the long pause. Just trying to catch up here.
>>>>>>>>>>>>
>>>>>>>>>>>> I think it save to allow `addStateStore()` to be idempotent for
>> the
>>>>>>>>> same
>>>>>>>>>>>> `StoreBuilder` object. In fact, the `name` is "hard coded" and
>> thus
>>>>>>>>> it's
>>>>>>>>>>>> not really possible to use the same `StoreBuilder` object to
>> create
>>>>>>>>>>>> different stores.
>>>>>>>>>>>>
>>>>>>>>>>>> I also agree with the concern, that only allowing a single store
>>>>>> (as
>>>>>>>>>>>> proposed by Ivan) might be too restrictive.
>>>>>>>>>>>>
>>>>>>>>>>>> Overall, the current KIP version LGTM. I don't have mayor
>> concerns
>>>>>>>>> about
>>>>>>>>>>>> user education for this case, but I agree that we need to
>> document
>>>>>>>>> this
>>>>>>>>>>>> clearly.
>>>>>>>>>>>>
>>>>>>>>>>>> Some further comments:
>>>>>>>>>>>>
>>>>>>>>>>>> (1) I am not sure if `StateStoreConnector` is the best name for
>> the
>>>>>>>>> new
>>>>>>>>>>>> interface. Note, that there are two concepts about stores:
>>>>>>>>>>>>
>>>>>>>>>>>>  - adding a store: this makes the store available in the
>> topology
>>>>>> in
>>>>>>>>>>>> general (however, the store is still "dangling", and not used)
>>>>>>>>>>>>  - connecting a store: this allows a processor etc to use a
>> store
>>>>>>>>>>>>
>>>>>>>>>>>> The new interface does both, but its name only indicates that
>>>>>> second
>>>>>>>>>>>> part what might be confusing. It might be especially confusing
>>>>>>>> because
>>>>>>>>>>>> we want to disallow to mix the exiting "manually add and
>> connect"
>>>>>>>>>>>> pattern, with a new pattern to "auto add+connect". If the new
>>>>>>>>> interface
>>>>>>>>>>>> name indicates the connect part only, user might think they
>> need to
>>>>>>>>> add
>>>>>>>>>>>> stores manually and can connect automatically.
>>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately, I don't have a much better suggestion for a name
>>>>>>>>> either.
>>>>>>>>>>>> The only idea that came to my mind was `StoreProvider`: to me, a
>>>>>>>>>>>> provider is a "service" interface that does work for us, ie, it
>>>>>> adds
>>>>>>>>> and
>>>>>>>>>>>> connects a store. Not sure if this is too subtle, if we consider
>>>>>>>> that
>>>>>>>>>>>> there is already the `StoreSupplier` interface?
>>>>>>>>>>>>
>>>>>>>>>>>> But maybe somebody else might still have a good idea on how the
>>>>>>>>> improve
>>>>>>>>>>>> the name.
>>>>>>>>>>>>
>>>>>>>>>>>> In any case, I would suggest to shorten the name to
>>>>>> `StoreConnector`
>>>>>>>>>>>> instead of `StateStoreConnector`, because we also have
>>>>>>>> `StoreSupplier`
>>>>>>>>>>>> and `StoreBuilder`.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> (2) The KIP proposes to add the new interface to
>>>>>> `ProcessorSupplier`
>>>>>>>>> etc
>>>>>>>>>>>> and to add a default implementation for the new method. Hence,
>> user
>>>>>>>>>>>> would need to overwrite this default implementation to op-in to
>> the
>>>>>>>>>>>> feature. I am wonder if it might be better to not add the new
>>>>>>>>> interface
>>>>>>>>>>>> to `ProcessorSupplier` etc and to just provide a new interface
>> with
>>>>>>>> no
>>>>>>>>>>>> default implementation. Users would opt-in by adding the
>> interface
>>>>>>>>>>>> explicitly to their existing `ProcessorSupplier` implementation.
>>>>>>>>>>>> Overwriting a default method and getting different behavior
>> seems
>>>>>> to
>>>>>>>>> be
>>>>>>>>>>>> a little subtle to me, especially, because we don't want to
>> allow
>>>>>> to
>>>>>>>>>>>> mix-and-match the old and new approaches. Think: I only
>> overwrite a
>>>>>>>>>>>> default method and my code breaks.
>>>>>>>>>>>>
>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> (3) If we keep the current default implementation for the new
>>>>>>>> method,
>>>>>>>>> I
>>>>>>>>>>>> am wondering if it should return `null` instead of an empty
>>>>>>>>> collection?
>>>>>>>>>>>> This might be saver to detect bugs in user code for which, per
>>>>>>>>> accident,
>>>>>>>>>>>> an empty collection could be returned.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> (4) Should the new method return a `Set` instead of a
>> `Collection`
>>>>>>>> to
>>>>>>>>>>>> indicate the semantics clearly (ie, returning the same
>>>>>>>> `StoreBuilder`
>>>>>>>>>>>> multiple times is idempotent and one cannot add+connect to it
>>>>>>>> twice).
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 4/6/19 12:27 PM, Paul Whalen wrote:
>>>>>>>>>>>>> Ivan and Guozhang,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the thoughts!  Ivan's use case is definitely
>>>>>>>> interesting.
>>>>>>>>>> The
>>>>>>>>>>>>> way I see it, if we can achieve the main goal of the KIP
>> (allowing
>>>>>>>>>>>>> Processor/TransformerSuppliers to encapsulate their usage of
>> state
>>>>>>>>>>>> stores),
>>>>>>>>>>>>> we will enable this kind of thing in "user space" very easily.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I will say that I'm not totally sure that most use cases of
>>>>>>>>> transform()
>>>>>>>>>>>> use
>>>>>>>>>>>>> just one state store.  It's hard to know since I haven't seen
>> many
>>>>>>>>>>>> examples
>>>>>>>>>>>>> in public, but my team's usages almost exclusively require
>>>>>> multiple
>>>>>>>>>> state
>>>>>>>>>>>>> stores.  We only reach for the low level processor API when we
>>>>>> need
>>>>>>>>>> that
>>>>>>>>>>>>> complexity, and it's somewhat hard to imagine many use cases
>> that
>>>>>>>>> only
>>>>>>>>>>>> need
>>>>>>>>>>>>> one state store, since the high level DSL can usually
>> accomplish
>>>>>>>>> those
>>>>>>>>>>>>> tasks.  The example Ivan presented for instance looks like a
>>>>>>>>>>>>> stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious
>> what
>>>>>>>>> sort
>>>>>>>>>> of
>>>>>>>>>>>>> other usages you're imagining.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That being said, perhaps the Processor API should really just
>> be
>>>>>>>>>>>> considered
>>>>>>>>>>>>> a separate paradigm in Streams, not just a lower level that we
>>>>>>>> reach
>>>>>>>>> to
>>>>>>>>>>>>> when necessary.  In which case it would be beneficial to make
>> the
>>>>>>>>>> simple
>>>>>>>>>>>>> use cases easier.  I've definitely talked about this with my
>> own
>>>>>>>>> team -
>>>>>>>>>>>> if
>>>>>>>>>>>>> you're less familiar with the kind of functional style that the
>>>>>>>> high
>>>>>>>>>>>> level
>>>>>>>>>>>>> DSL offers, it might be easier to "see" your state and interact
>>>>>>>> with
>>>>>>>>> it
>>>>>>>>>>>>> directly.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Anyway, I've updated the KIP to reflect my current PR with
>>>>>>>> Guozhang's
>>>>>>>>>>>>> suggestions.  It seems like there is at least some interest in
>>>>>> that
>>>>>>>>> on
>>>>>>>>>>>> its
>>>>>>>>>>>>> own and not a ton of pushback, so I think I will try to start a
>>>>>>>> vote.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <
>>>>>>>> iponoma...@mail.ru>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I was about to write another KIP, but found out that KIP-401
>>>>>>>>> addresses
>>>>>>>>>>>>>> exactly the problem I faced. So let me jump into your
>> discussion
>>>>>>>> and
>>>>>>>>>> ask
>>>>>>>>>>>>>> you to assess another idea.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I fully agree with the KIP-401's motivation part. E. g in my
>>>>>>>>> project I
>>>>>>>>>>>> had
>>>>>>>>>>>>>> to invent a wrapper class that hides the details of
>> KeyValueStore
>>>>>>>>>>>>>> management from business logic. Of course this should be done
>>>>>>>> better
>>>>>>>>>> in
>>>>>>>>>>>>>> KStreams API.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But I was about to look at this problem from another side and
>>>>>>>>> propose
>>>>>>>>>> a
>>>>>>>>>>>>>> simple alternative in high-level DSL, that will not fit all
>> the
>>>>>>>>> cases,
>>>>>>>>>>>> but
>>>>>>>>>>>>>> most of them. Hence my idea does not exclude the Paul's
>> proposal.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What if we restrict ourselves to *only one* KeyValueStore and
>>>>>>>>> propose
>>>>>>>>>> a
>>>>>>>>>>>>>> method that resembles  `aggregate` and `reduce` methods, like
>>>>>>>> this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>    .map(...)
>>>>>>>>>>>>>>    .filter(...)
>>>>>>>>>>>>>>    .transform ((k, v, s)->{....}, Transformed.with(....))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> where
>>>>>>>>>>>>>> * k, v -- input key & value
>>>>>>>>>>>>>> * s -- a KeyValueStore provided as an argument
>>>>>>>>>>>>>> * return value of the lambda should be KeyValue.pair(...)
>>>>>>>>>>>>>> * Transformed.with... is a builder, used in order to define
>> the
>>>>>>>>>>>>>> Transformer and KeyValueStore building parameters. Some of
>> these
>>>>>>>>>>>> parameters
>>>>>>>>>>>>>> should be:
>>>>>>>>>>>>>> ** store's KeySerde,
>>>>>>>>>>>>>> ** store's ValueSerde,
>>>>>>>>>>>>>> ** whether the store is persistent or in-memory,
>>>>>>>>>>>>>> ** store's name -- optional parameter, the system should be
>> able
>>>>>>>> to
>>>>>>>>>>>> devise
>>>>>>>>>>>>>> the name of the store transparently for the user, if we don't
>>>>>> want
>>>>>>>>> to
>>>>>>>>>>>>>> devise it ourselves/share the store between processors.
>>>>>>>>>>>>>> ** scheduled punctuation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Imagine we have a KStream<String, Integer>, and we need to
>>>>>>>>> calculate a
>>>>>>>>>>>>>> `derivative` stream, that is, a stream of 'deltas' of the
>>>>>> provided
>>>>>>>>>>>> integer
>>>>>>>>>>>>>> values.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This could be achieved as simple as
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stream.transform((key, value, stateStore) -> {
>>>>>>>>>>>>>>         int previousValue =
>>>>>>>>>>>>>> Optional.ofNullable(stateStore.get(key)).orElse(0);
>>>>>>>>>>>>>>         stateStore.put(key, value);
>>>>>>>>>>>>>>         return KeyValue.pair(key, value - previousValue);
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>         //we do not need to bother with store name,
>> punctuation
>>>>>>>> etc.
>>>>>>>>>>>>>>         //may be even Serde part can be omitted, since we can
>>>>>>>>> inherit
>>>>>>>>>>>> the
>>>>>>>>>>>>>> serdes from stream by default
>>>>>>>>>>>>>>         , Transformed.with(Serdes.String(), Serdes.Integer())
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The hard part of it is that new `transform` method definition
>>>>>>>> should
>>>>>>>>>> be
>>>>>>>>>>>>>> parameterized by six type parameters:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * input/output/KeyValueStore key type,
>>>>>>>>>>>>>> * input/output/KeyValueStore value type.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However, it seems that all these types can be inferred from
>> the
>>>>>>>>>> provided
>>>>>>>>>>>>>> lambda and Transformed.with instances.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you think about this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 27.03.2019 20:45, Guozhang Wang пишет:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the uploaded PR and the detailed description! I've
>>>>>>>> made a
>>>>>>>>>>>> pass
>>>>>>>>>>>>>> on it and left some comments.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Overall I think I agree with you that passing in the
>> storebuilder
>>>>>>>>>>>> directly
>>>>>>>>>>>>>> that store name is more convienent as it does not require
>> another
>>>>>>>>>>>>>> `addStore` call, but we just need to spend some more
>>>>>> documentation
>>>>>>>>>>>> effort
>>>>>>>>>>>>>> on educating users about the two ways of connecting their
>> stores.
>>>>>>>>> I'm
>>>>>>>>>>>>>> slightly concerned about this education curve but I can be
>>>>>>>> convinced
>>>>>>>>>> if
>>>>>>>>>>>>>> most people felt it is worthy.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <
>> pgwha...@gmail.com>
>>>>>>>> <
>>>>>>>>>>>> pgwha...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'd like to resurrect this discussion with a cursory,
>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>> implementation of the KIP which combines many of our ideas:
>>>>>>>>>>>> https://github.com/apache/kafka/pull/6496.  I tried to keep the
>>>>>>>> diff
>>>>>>>>> as
>>>>>>>>>>>>>> small as possible for now, just using it to convey the main
>>>>>> ideas.
>>>>>>>>>> But
>>>>>>>>>>>>>> I'll separately address some of our earlier discussion:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    - Will there be a new, separate interface for users to
>>>>>>>> implement
>>>>>>>>>> for
>>>>>>>>>>>> the
>>>>>>>>>>>>>>    new functionality? No, to hopefully keep things simple,
>> all of
>>>>>>>>> the
>>>>>>>>>>>>>>    Processor/TransformerSupplier interfaces will just extend
>>>>>>>>>>>>>>    StateStoresSupplier, allowing users to opt in to this
>>>>>>>>> functionality
>>>>>>>>>>>> by
>>>>>>>>>>>>>>    overriding the default implementation that gives an empty
>>>>>> list.
>>>>>>>>>>>>>>    - Will the interface allow users to specify the store
>> name, or
>>>>>>>>> the
>>>>>>>>>>>>>>    entire StoreBuilder? The entire StoreBuilder, so the
>>>>>>>>>>>>>>    Processor/TransformerSupplier can completely encapsulate
>> name
>>>>>>>> and
>>>>>>>>>>>>>>    implementation of a state store if desired.
>>>>>>>>>>>>>>    - Will the old way of specifying store names alongside the
>>>>>>>>> supplier
>>>>>>>>>>>> when
>>>>>>>>>>>>>>    calling stream.process/transform() be deprecated? No, this
>> is
>>>>>>>>>> still a
>>>>>>>>>>>>>>    legitimate way to wire up Processors/Transformers and their
>>>>>>>>> stores.
>>>>>>>>>>>> But
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>    would recommend not allowing stream.process/transform()
>> calls
>>>>>>>>> that
>>>>>>>>>>>> use
>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>    store declaration mechanisms (this restriction is not in
>> the
>>>>>>>>> proof
>>>>>>>>>> of
>>>>>>>>>>>>>>    concept)
>>>>>>>>>>>>>>    - How will we handle adding the same state store to the
>>>>>>>> topology
>>>>>>>>>>>>>>    multiple times because different
>>>>>> Processor/TransformerSuppliers
>>>>>>>>>>>> declare
>>>>>>>>>>>>>> it?
>>>>>>>>>>>>>>    topology.addStateStore() will be slightly relaxed for
>>>>>>>>> convenience,
>>>>>>>>>>>> and
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>    allow adding the same StoreBuilder multiple times as long
>> as
>>>>>>>> the
>>>>>>>>>>>> exact
>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>    StoreBuilder instance is being added for the same store
>> name.
>>>>>>>>> This
>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>    to prevent in practice the issue of accidentally making two
>>>>>>>> state
>>>>>>>>>>>> stores
>>>>>>>>>>>>>>    one by adding with the same name.  For additional safety,
>> if
>>>>>> we
>>>>>>>>>>>> wanted
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>    (not in the proof of concept), we could allow for this
>>>>>>>> relaxation
>>>>>>>>>>>> only
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>    internal callers of topology.addStateStore().
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So, in summary, the use cases look like:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    - 1 transformer/processor that owns its store: Using the
>> new
>>>>>>>>>>>>>>    StateStoresSupplier interface method to supply its
>>>>>>>> StoreBuilders
>>>>>>>>>> that
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>    be added to the topology automatically.
>>>>>>>>>>>>>>    - Multiple transformer/processors that share the same
>> store:
>>>>>>>>> Either
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. The old way: the StoreBuilder is defined "far away" from
>>>>>> the
>>>>>>>>>>>>>>    Transformer/Processor implementations, and is added to the
>>>>>>>>> topology
>>>>>>>>>>>>>>    manually by the user
>>>>>>>>>>>>>>    2. The new way: the StoreBuilder is defined closer to the
>>>>>>>>>>>>>>    Transformer/Processor implementations, and the same
>> instance
>>>>>> is
>>>>>>>>>>>>>> returned by
>>>>>>>>>>>>>>    all Transformer/ProcessorSuppliers that need it
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This makes the KIP wiki a bit stale; I'll update if we want to
>>>>>>>> bring
>>>>>>>>>>>> this
>>>>>>>>>>>>>> design to a vote.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <
>>>>>> wangg...@gmail.com
>>>>>>>>>
>>>>>>>>> <
>>>>>>>>>>>> wangg...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Matthias / Paul,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The concern I had about introducing `StoreBuilderSupplier` is
>>>>>>>> simply
>>>>>>>>>>>>>> because it is another XXSupplier to the public API, so I'd
>> like
>>>>>> to
>>>>>>>>> ask
>>>>>>>>>>>> if
>>>>>>>>>>>>>> we really have to add it :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The difference between encapsulating the store name and
>>>>>>>>> encapsulating
>>>>>>>>>>>> the
>>>>>>>>>>>>>> full state store builder is that, in the former:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> String storeName = "store1";
>>>>>>>>>>>>>> builder.addStore(new MyStoreBuilder(storeName));
>>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplier(storeName));   //
>>>>>>>>>> following
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> proposal, that the store name can be passed in and used for
>> both
>>>>>>>>>>>>>> `listStores` and in the `Transformer#init`; so the Transformer
>>>>>>>>>> function
>>>>>>>>>>>>>> does not need to get the constant string name again.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                          // one caveat to admit, is that
>>>>>>>>>>>>>> MyTransofmerSupplier logic may be just unique to `store1` so
>> it
>>>>>>>>> cannot
>>>>>>>>>>>> be
>>>>>>>>>>>>>> reused with a different store name anyways.
>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> While in the latter:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplierForStore1);   //
>> the
>>>>>>>> name
>>>>>>>>>> is
>>>>>>>>>>>>>> just indicating that we may have one such supplier for each
>>>>>> store.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I understand the latter introduce more convenience from the
>> API,
>>>>>>>> but
>>>>>>>>>> the
>>>>>>>>>>>>>> cost is that since we still cannot completely
>> `builder.addStore`,
>>>>>>>>> but
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> reduce its semantic scope to shared state stores only,; hence
>>>>>>>> users
>>>>>>>>>> need
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> learn two ways of creating state stores for those two
>> patterns.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My argument is that more public APIs requires longer learning
>>>>>>>> curve
>>>>>>>>>> for
>>>>>>>>>>>>>> users, and introduces more usage patterns that may confuse
>> users
>>>>>>>>> (the
>>>>>>>>>>>>>> proposal I had tries to replace one with another completely).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <
>> pgwha...@gmail.com>
>>>>>>>> <
>>>>>>>>>>>> pgwha...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the great thoughts Matthias and Guozhang!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If I'm not mistaken, Guozhang's suggestion is what my second
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> alternative
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the KIP is ("Have the added method on the Supplier interfaces
>>>>>> only
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> store names, not builders").  I do think it would be a
>> worthwhile
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> usability
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> improvement on its own, but to Matthias's point, it doesn't
>>>>>>>> achieve
>>>>>>>>>> the
>>>>>>>>>>>>>> full goal of completing encapsulating a state store and it's
>>>>>>>>> processor
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> encapsulates the name, but not the StateStoreBuilder.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm really intrigued by Matthias's idea that forgoes the
>> default
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> method I proposed.  Having smaller, separate interfaces is a
>>>>>>>>> powerful
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> idea
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and I think a cleaner API than what I proposed.  The
>> non-shared
>>>>>>>>> store
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> case is handled well here, and the shared store use case is
>>>>>>>>> possible,
>>>>>>>>>>>>>> though maybe still not as graceful as we would like (having to
>>>>>> add
>>>>>>>>> the
>>>>>>>>>>>>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe
>> too
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> subtle
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> me).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We're all agreed that one of the big problems with the shared
>>>>>>>> store
>>>>>>>>>> use
>>>>>>>>>>>>>> case is how to deal with adding the same store to the topology
>>>>>>>>>> multiple
>>>>>>>>>>>>>> times.  Catching the "store already added" exception is risky.
>>>>>>>>> Here's
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> maybe radical idea: change `topology.addStateStore()` to be
>>>>>>>>> idempotent
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> adding a given state store name and `StoreBuilder`.  In other
>>>>>>>> words,
>>>>>>>>>>>>>> `addStateStore` would not throw the "store already added"
>>>>>>>> exception
>>>>>>>>> if
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `StoreBuilder` being added for a given name has the same
>> identity
>>>>>>>> as
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> one that has already been added.  Does this eliminate all the
>>>>>> bugs
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> we're
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> worried about?  Thinking about it for a few minutes, it seems
>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> eliminate
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> most at least (would a user really use the exact same
>>>>>> StoreBuilder
>>>>>>>>>> when
>>>>>>>>>>>>>> they intend there to be two stores?).  It might make the API
>>>>>>>>> slightly
>>>>>>>>>>>>>> harder to use if a user isn't immediately aware of that
>> subtlety,
>>>>>>>>> but
>>>>>>>>>> a
>>>>>>>>>>>>>> good error message should ease the pain, and it would happen
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> immediately
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> during development.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> And with regards to Matthias's comment about whether we need
>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> deprecate
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> existing varargs transform methods - I don't think we need to,
>>>>>> but
>>>>>>>>> it
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be nice for there only to be one way to do things, assuming
>>>>>>>> whatever
>>>>>>>>>> we
>>>>>>>>>>>>>> come up with supports all existing use cases.  I don't feel
>>>>>>>> strongly
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this, but if we don't deprecate, I do think it's important to
>> add
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> checks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> that prevent users from trying to do the same thing in two
>>>>>>>> different
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ways,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> as we've discussed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <
>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding the last option to catch "store exist already"
>>>>>> exception
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be
>> hiding
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> user bugs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree with this concern. From my original email:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The only disadvantage I see, might be
>>>>>>>>>>>>>> potential bugs about sharing state if two different stores are
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> named
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> same by mistake (this would not be detected).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For your new proposal: I am not sure if it addresses Paul's
>>>>>>>> original
>>>>>>>>>>>>>> idea -- I hope Paul can clarify. From my understanding, the
>> idea
>>>>>>>> was
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> encapsulate a store and its processor. As many stores are not
>>>>>>>>> shared,
>>>>>>>>>>>>>> this seems to be quite useful. Your proposal falls a little
>> short
>>>>>>>> to
>>>>>>>>>>>>>> support encapsulation for none-shared stores.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 12/15/18 1:40 AM, Guozhang Wang wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your feedbacks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding the last option to catch "store exist already"
>>>>>> exception
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be
>> hiding
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> user bugs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thinking about Paul's proposal and your suggestion again, I'd
>>>>>> like
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> propose another alternative somewhere in the middle of your
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> approaches,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> i.e. we still let users to create sharable state stores via
>>>>>>>>>>>>>> `addStateStore`, and we allow the TransformerSupplier to
>> return a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> list
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> state stores that it needs, i.e.:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public interface TransformerSupplier<K, V, R> {
>>>>>>>>>>>>>>     Transformer<K, V, R> get();
>>>>>>>>>>>>>>     default List<String> stateStoreNames() {
>>>>>>>>>>>>>>         return Collections.emptyList();
>>>>>>>>>>>>>> <
>>>>>>>> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ;>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> by doing this users can still "consolidate" the references of
>>>>>>>> store
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> names
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> in a single place in the transform call, e.g.:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class MyTransformerSupplier<K, V, R> {
>>>>>>>>>>>>>>     private String storeName;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public class MyTransformer<K, V, R> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>        ....
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>        init() {
>>>>>>>>>>>>>>           store = context.getStateStore(storeName);
>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     default List<String> stateStoreNames() {
>>>>>>>>>>>>>>         return Collections.singletonList(storeName);
>>>>>>>>>>>>>> <
>>>>>>>> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ;>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Basically, we move the parameters from the caller of
>> `transform`
>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> inside
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the TransformSuppliers. DSL implementations would not change
>>>>>> much,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> simply
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> calling `connectStateStore` by getting the list of names from
>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> provided
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> function.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just a meta comment: do we really need to deprecate existing
>>>>>>>>>>>>>> `transform()` etc methods?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The last argument is a vararg, and thus, just keeping the
>>>>>> existing
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> for this part seems to work too, allowing to implement both
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> patterns?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also, instead of adding a default method, we could also add a
>> new
>>>>>>>>>>>>>> interface `StoreBuilderSupplier` with method
>> `List<StoreBuilder>
>>>>>>>>>>>>>> stateStores()` -- users could implement `TransformerSupplier`
>> and
>>>>>>>>>>>>>> `StoreBuilderSupplier` at once; and for this case, we require
>>>>>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> don't provide store name in `transform()`.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Similar, we could add an interface `StoreNameSupplier` with
>>>>>> method
>>>>>>>>>>>>>> `List<String> stateStores()`. This allows to "auto-wire" a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> transformer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to existing stores (to avoid the issue to add the same store
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> times).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hence, for shared stores, there would be one "main"
>> transformer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> implements `StoreBuilderSupplier` and that must be added
>> first to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> topology. The other transformers would implement
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `StoreNameSupplier`
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> just connect to those stores.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Another possibility to avoid the issue of adding the same
>> stores
>>>>>>>>>>>>>> multiple times would be, that the DSL always calls
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `addStateStore()`
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> catches a potential "store exists already" exception and falls
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `connectProcessorAndStateStore()` for this case. Thus, we
>> would
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the `StoreNameSupplier` interface and the order in which
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> transformers
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> are added would not matter either. The only disadvantage I
>> see,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> potential bugs about sharing state if two different stores are
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> named
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> same by mistake (this would not be detected).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just some ideas I wanted to share. What do you think?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 12/11/18 3:46 AM, Paul Whalen wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ah yes of course, this was an oversight, I completely ignored
>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> processors sharing the same state store when writing up the
>> KIP.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Which
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> funny, because I've actually done this (different processors
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> sharing
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stores) a fair amount myself, and I've settled on a pattern
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> where I
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the Processors in an enclosing class, and that enclosing class
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> handles
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> much as possible.  Here's a gist showing the rough structure,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> context:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> . Note how it adds the stores to the topology, as well as
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> providing a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public method with the store names.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't think my proposal completely conflicts with the
>> multiple
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> processors
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> sharing state stores use case, since you can create a supplier
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> provides the store name you want, somewhat independently of
>> your
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Processor logic.  The issue I do see though, is that
>>>>>>>>>>>>>> topology.addStateStore() can only be called once for a given
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> your example, if the there was a single TransformerSupplier
>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> passed
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> into both transform() calls, "store1" would be added (under
>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> hood)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the topology twice, which is no good.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Perhaps this suggests that one of my alternatives on the KIP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> desirable: either not having the suppliers return
>> StoreBuilders
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (just
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> names), or not deprecating the old methods that take
>> "String...
>>>>>>>>>>>>>> stateStoreNames". I'll have to think about it a bit.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the great writeup (very detailed and crystal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> motivation
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> sections!).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is quite an interesting idea and I do like the API
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> cleanness
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> proposed. The original motivation of letting StreamsTopology
>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stores though, is to allow different processors to share the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> builder.addStore("store1");
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> // a path of stream transformations that leads to KStream
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stream1.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> stream1.transform(..., "store1");
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> // another path that generates a KStream stream2.
>>>>>>>>>>>>>> stream2.transform(..., "store1");
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Behind the scene, Streams will make sure stream1 / stream2
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> transformations
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> will always be grouped together as a single group of tasks,
>> each
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> will be executed by a single thread and hence there's no
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> concurrency
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> issues
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> on accessing the store from different operators within the
>> same
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> task.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> not sure how common this use case is, but I'd like to hear if
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thoughts maintaining this since the current proposal seems
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> exclude
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> possibility.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <
>> pgwha...@gmail.com>
>>>>>> <
>>>>>>>>>>>> pgwha...@gmail.com>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's KIP-401 for discussion, a minor Kafka Streams API
>> change
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> think could greatly increase the usability of the low-level
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> processor
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have some code written but will wait to see if there is buy
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> going all out and creating a pull request.  It seems like most
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> would be in updating documentation and tests.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to