I am not sure if I full understand, hence, I try to rephrase: > I can't think of an example that would require both ways, or would > even be more readable using both ways.
Example: There are two processor A and B, and one store S that both need to access and one store S_b that only B needs to access: If we don't allow to mix both approaches, it would be required to write the following code: Topology t = new Topology(); t.addProcessor("A", ...); // does not add any store t.addProceccor("B", ...); // does not add any store t.addStateStore(..., "A", "B"); // adds S and connect it to A and B t.addStateStore(..., "B"); // adds S_b and connect it to B // DSL example: StreamsBiulder b = new StreamsBuilder(); b.addStateStore() // adds S b.addStateStore() // adds S_b stream1.process(..., "S") // add A and connect S stream2.process(..., "S", "S_b") // add B and connect S and S_b If we allow to mixes both approaches, the code could be (simplified to): Topology t = new Topology(); t.addProcessor("A", ...); // does not add any store t.addProceccor("B", ...); // adds/connects S_b implicitly t.addStateStore(..., "A", "B"); // adds S and connect it to A and B // DSL example StreamsBiulder b = new StreamsBuilder(); b.addStateStore() // adds S stream1.process(..., "S") // add A and connect S stream2.process(..., "S") // add B and connect S; adds/connects S_b implicitly The fact that B has a "private store" could be encapsulated and I don't see why this would be bad? > If you can > do both ways, the actual full set of state stores being connected could be > in wildly different places in the code, which could create confusion. Ie, I don't see why the second version would be confusing, or why the first version would be more readable (I don't argue it's less readable either though; I think both are equally readable)? Or do you argue that we should allow the following: > Shared stores can be passed from > the outside in an anonymous ProcessorSupplier if desired, making it > effectively the same as passing the stateStoreNames var args Topology t = new Topology(); t.addProcessor("A", ...); // adds/connects S implicitly t.addProceccor("B", ...); // adds/connects S and S_b implicitly // DSL example StreamsBiulder b = new StreamsBuilder(); stream1.process(...) // add A and add/connect S implicitly stream2.process(...) // add B and add/connect S and S_b implicitly For this case, the second implicit adding of S would require to return the same `StoreBuilder` instance to make it idempotent what seems hard to achieve, because both `ProcessorSuppliers` now have a cross dependency to us the same object. Hence, I don't think this would be a good approach. Also, because we require for a unique store name to always pass the same `StoreBuilder` instance, we have actually a good protection against user bug that may add two stores with the same name but different builders twice. I also do not feel super strong about it, but see some advantages to allow the mixed approach, and don't see disadvantages. Would be good to get input from others, too. -Matthias On 8/7/19 7:29 PM, Paul Whalen wrote: > My thinking on restricting the API to enforce only one way of connecting > stores would make it more simple to use and end up with more readable > code. I can't think of an example that would require both ways, or would > even be more readable using both ways. Shared stores can be passed from > the outside in an anonymous ProcessorSupplier if desired, making it > effectively the same as passing the stateStoreNames var args. If you can > do both ways, the actual full set of state stores being connected could be > in wildly different places in the code, which could create confusion. I > personally can't imagine a case in which that would be useful. > > All that being said, I don't feel terribly strongly about it. I'm just > trying to make the API as straightforward as possible. Admittedly a > runtime check doesn't make for a great API, but I see it more as an > opportunity to educate the user to make it clear that "connecting" a state > store is a thing that can be done in two different ways, but there is no > reason to mix both. If it seems like there's a compelling reason to mix > them then I would abandon the idea in a heartbeat. > > Paul > > On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Sorry for the long silence on this KIP Paul! I guess the 2.3 release >> distracted us somewhat. >> >> Overall, I am +1. >> >> With regard to John's point about owned vs shared state stores, I think >> it describe a valid use case, and throwing an exception if people want >> to mix both features might be too restrictive? >> >> We could of course later relax the restriction, but atm I am not sure >> what the main argument for adding the restriction is? >> >> (a) In the current API, one could connect the same store multiple times >> to the same processor without getting an exception, because the >> operation is idempotent. >> >> (b) The KIP also suggest to relax the current restriction to add the >> same store twice, as long as store name and `StoreBuilder` instance are >> the same, because it's an idempotent (hence, safe) operation too. >> >> Because we have already (a) and (b) and consider both as safe, it seems >> we could also treat the case of mixing both patterns as idempotent and >> hence safe. And if we do this, we enable to mix both patterns for >> different stores implicitly. >> >> >> Thoughts? >> >> >> -Matthias >> >> >> >> On 6/17/19 2:31 PM, John Roesler wrote: >>> Hey, all, >>> >>> Sorry I'm late to the party. I meant to read into this KIP before, but >>> didn't get around to it. I was just reminded when Paul mentioned it in >>> a different thread. Please feel free to bump a discussion any time it >>> stalls! >>> >>> I've just read through the whole discussion so far, and, to echo the >>> earlier sentiments, the motivation seems very clear. I remember how >>> hard it was to figure out how to actually wire up a stateful processor >>> properly the first couple of times. Not a very good user experience. >>> >>> I looked over the whole conversation to date, as well as the KIP and >>> the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The >>> current approach looks good to me. I was concerned about the "cheat >>> codes"-style mixin interface. Discoverability would have been a >>> problem, and it's also not a very normal pattern for Java APIs. It >>> actually looks a little more like something you'd do with an >>> annotation. >>> >>> So the current approach seems good: >>> * The new interface with a default to return `null` is effectively >>> shipping the feature flagged "off" (which is nice and safe) >>> * Shared stores are "supported" the same way they always have been, by >>> connecting them externally. This makes sense, since those stores >>> aren't "owned" by any of the connected processors. >>> * Processors that do own their stores can configure them in the same >>> file they use them, which decreases the probability of cast exceptions >>> when they get the stores from the context. >>> * Stateful processors that own their stores are available for one-shot >>> definition of the stores and the processor all in the same file (this >>> is the main point of the KIP) >>> >>> The runtime check that stores can't be both defined in the processor >>> and referenced by name might be a little restrictive (since we already >>> have the restriction that same-name stores can't be registered), but >>> it would also be easy to remove it later. I'm just thinking that if I >>> have a processor that owns one store and shares another, it would be >>> pretty obvious how to hook it up in the proposed API, except for that >>> check. >>> >>> One last thought, regarding the all-important interface name: If you >>> wanted to indicate more that the stores are available for Streams to >>> connect, rather than that they are already connected, you could call >>> it ConnectableStoreProvider (similar to AutoCloseable). >>> >>> I just thought I'd summarize the current state, since it's been a >>> while and no one has voted yet. I'll go ahead and vote now on the >>> voting thread, since I'm +1 on the current proposal. >>> >>> Thanks, >>> -John >>> >>> On Mon, May 27, 2019 at 1:59 PM Paul Whalen <pgwha...@gmail.com> wrote: >>>> >>>> It wasn't much of a lift changing option B to work for option C, so I >>>> closed that PR and made a new one, which should be identical to the KIP >>>> right now: https://github.com/apache/kafka/pull/6824. There are a few >>>> todos still which I will hold off until the KIP is accepted. >>>> >>>> I created a voting thread about a month ago, so I'll bump that now that >>>> we're nearly there. >>>> >>>> Paul >>>> >>>> On Sun, May 26, 2019 at 2:21 PM Paul Whalen <pgwha...@gmail.com> wrote: >>>> >>>>> Per Matthias's suggestion from a while ago, I actually implemented a >> good >>>>> amount of option B to get a sense of the user experience and >> documentation >>>>> requirements. For a few reasons mentioned below, I think it's not my >>>>> favorite option, and I prefer option C. But since I did the work and >> it >>>>> can help discussion, I may as well share: >>>>> https://github.com/apache/kafka/pull/6821. >>>>> >>>>> Things I learned along the way implementing Option B: >>>>> - For the name of the interface, I like ConnectedStoreProvider. It >> isn't >>>>> perfect but it seems to capture the general gist without being overly >>>>> verbose. I get that from a strict standpoint it's not "providing >> connected >>>>> stores" but is instead "providing stores to be connected," but I think >> that >>>>> in context and with documentation, the risk of someone being confused >> by >>>>> that is low. >>>>> - I definitely felt the discoverability issue while trying to write >> clear >>>>> documentation; you really have to make sure to connect the dots for the >>>>> user when the interface isn't connected to anything. >>>>> - Another problem with a separate interface found while writing >>>>> tests/examples: defining a ProcessorSupplier that also implements >>>>> ConnectedStoreProvider cannot be done anonymously, since you can't >> define >>>>> an anonymous class in Java that implements multiple interfaces. I >> actually >>>>> consider this a fairly major usability issue - it means a user always >> has >>>>> to have a custom class rather than doing it inline. We could provide >> an >>>>> abstract class that implements the two, but at that point, we're not >> that >>>>> far from option A or C anyway. >>>>> >>>>> I updated the KIP with my current thinking, which as mentioned is >>>>> Matthias's option C. Once again for clarity, that *is not* what is in >> the >>>>> linked pull request. The current KIP is my proposal. >>>>> >>>>> Thanks everyone for the input! >>>>> >>>>> P.S. What do folks use to edit the HTML documentation, e.g. >>>>> processor-api.html? I looked at doing it by hand it but it kind of >> looked >>>>> like agony with all the small tags required for formatting code, so I'm >>>>> sort of assuming there's tooling for it. >>>>> >>>>> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> I think the discussion mixed approaches a little bit, hence, let me >>>>>> rephrase my understanding: >>>>>> >>>>>> >>>>>> A) add new method with default implementation to `ProcessorSupplier`: >>>>>> >>>>>> For this case, we don't add a new interface, but only add a new method >>>>>> to `ProcessorSupplier` -- to keep backward compatibility, we need to >> add >>>>>> a default implementation. Users opt into the new feature by >> overwriting >>>>>> the default implementation. >>>>>> >>>>>> >>>>>> B) We add a new interface with new method: >>>>>> >>>>>> For this case, `ProcessorSupplier` interface is not changed and it >> does >>>>>> also _not_ extend the new interface. Because `ProcessorSupplier` is >> not >>>>>> changed, it's naturally backward compatible. Users opt into the new >>>>>> feature, by adding the new interface to their ProcessorSupplier >>>>>> implementation and they need to implement the new method because there >>>>>> is no default implementation. Kafka Streams can use `instanceof` to >>>>>> detect if the new interface is used or not and thus, to the right >> thing. >>>>>> >>>>>> >>>>>> What was also discussed is a mix of both: >>>>>> >>>>>> C) We add a new interface with new method and let `ProcessorSupplier` >>>>>> extend the new interface: >>>>>> >>>>>> Here, we need to add a default implementation to preserve backward >>>>>> compatibility. Similar to (A), users opt into the feature by >> overwriting >>>>>> the default implementation. >>>>>> >>>>>> >>>>>> >>>>>> Option (C) is the same as (A) from a user point of view because a user >>>>>> won't care about the new interface. It only makes a difference for our >>>>>> code base, as we can share the default implementation of the new >> method >>>>>> This is only a small gain, as the implementation is trivial but also a >>>>>> small drawback as we add new public interface that is useless to the >>>>>> user because the user would never implement the interface directly. >>>>>> >>>>>> >>>>>> >>>>>> For (A/C), it might be simpler for users to detect the feature. For >> (B), >>>>>> we have the advantage that users must implement the method if they use >>>>>> the new interface. >>>>>> >>>>>> Overall, it seems that (A) might be the best choice because it makes >> the >>>>>> feature easier discoverable and does not add a "useless" interface. If >>>>>> you want to go with (C) to share the default implementation code, >> that's >>>>>> also fine with me. I am convinced now (even if I brought it up), that >>>>>> (B) might be not optimal because feature discoverability seems to be >>>>>> important. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> About `null` vs `emptyList`: I still tend to like `null` better but >> it's >>>>>> really a detail and not too important. Note, that the question only >>>>>> arises for (A/C), but not for (B) because for (B) we don't need a >>>>>> default implementation. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> @Paul: It's unclear to me atm what your final proposal is because you >>>>>> mentioned that you might want to rename `StateStoreConnector`? It's >> also >>>>>> unclear to me atm, if you prefer (A), (B), or (C). >>>>>> >>>>>> Maybe you can update the KIP if necessary and clearly state what you >>>>>> final proposal is. Beside this, it seems we can move to a VOTE? >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 5/2/19 3:01 PM, Bruno Cadonna wrote: >>>>>>> Hi Paul, >>>>>>> >>>>>>> I will try to express myself a bit clearer. >>>>>>> >>>>>>> Ad 1) >>>>>>> My assumption is that if `StateStoreConnector#stateStores()` returns >>>>>> `null` >>>>>>> Kafka Streams will throw an NPE because on purpose no null check is >>>>>>> performed before the loop that calls >> `StreamsBuilder#addStateStore()`. >>>>>> When >>>>>>> the user finally understands the cause of the NPE, she knows that she >>>>>> has >>>>>>> to override `StateStoreConnector#stateStores()` in her >> implementation. >>>>>> My >>>>>>> question was, why let the user discover that she has to overwrite the >>>>>>> method at runtime if you could not provide a default implementation >> for >>>>>>> `StateStoreConnector#stateStores()` and let the compiler tell the >> user >>>>>> the >>>>>>> need to overwrite the method. Not providing a default implementation >>>>>>> without separating the interfaces implies not being >> backward-compatible. >>>>>>> That means, if we choose to not provide a default implementation and >> let >>>>>>> the compiler signal the necessity to override the method, we have to >>>>>>> separate the interfaces in any case. >>>>>>> >>>>>>> Ad 2) >>>>>>> If you check for `null` or empty list in `process` and do not call >>>>>>> `addStateStores` in those cases, the advantage of returning `null` >> to be >>>>>>> saver to detect bugs as mentioned by Matthias would be lost. But >> maybe >>>>>> I am >>>>>>> missing something here. >>>>>>> >>>>>>> Best, >>>>>>> Bruno >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com> >> wrote: >>>>>>> >>>>>>>> I definitely don't mind anyone jumping, Bruno, thanks for the >> comments! >>>>>>>> >>>>>>>> 1) I'm not totally sure I'm clear on your point, but I think we're >> on >>>>>> the >>>>>>>> same page - if we're adding a method to the XSupplier interfaces (by >>>>>> making >>>>>>>> them inherit from a super interface StateStoreConnector) then we >>>>>> definitely >>>>>>>> need a default implementation to maintain compatibility. Whether >> the >>>>>>>> default implementation returns null or an empty list is somewhat of >> a >>>>>>>> detail. >>>>>>>> >>>>>>>> 2) If stream.process() sees that StateStoreConnector#stateStores() >>>>>> returns >>>>>>>> either null or an empty list, it would handle that case specifically >>>>>> and >>>>>>>> not try to call addStateStore at all. Or is this not what you're >>>>>> asking? >>>>>>>> >>>>>>>> Separately, I'm still hacking away at the details of the PR and will >>>>>>>> continue to get something into a discussable state, but I'll share >> some >>>>>>>> thoughts I've run into. >>>>>>>> >>>>>>>> A) I'm tentatively going the separate interface route (Matthias's >>>>>>>> suggestion) and naming it ConnectedStoreProvider. Still don't love >> the >>>>>>>> name, but there's something nice about the name indicating *why* >> this >>>>>> thing >>>>>>>> is providing the store, not just that it is providing it. >>>>>>>> >>>>>>>> B) It has occurred to me that topology.addProcessor() could also >>>>>> recognize >>>>>>>> if ProcessorSupplier implements ConnectedStoreProvider and add and >>>>>> connect >>>>>>>> stores appropriately. This isn't in the KIP and I think the >> value-add >>>>>> is >>>>>>>> lower (if you're reaching that low level, surely the "auto >> add/connect >>>>>>>> store" isn't too important to you), but I think it would be a >>>>>> confusing if >>>>>>>> it didn't, and I don't see any real downside. >>>>>>>> >>>>>>>> Paul >>>>>>>> >>>>>>>> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <br...@confluent.io> >>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> @Paul: Thank you for the KIP! >>>>>>>>> >>>>>>>>> I hope you do not mind that I jump in. >>>>>>>>> >>>>>>>>> I have the following comments: >>>>>>>>> >>>>>>>>> 1) `null` vs empty list in the default implementation >>>>>>>>> IIUC, returning `null` in the default implementation should >> basically >>>>>>>>> signal that the method `stateStores` was not overridden. Why then >>>>>>>> provide a >>>>>>>>> default implementation in the first place? Without default >>>>>> implementation >>>>>>>>> you would discover the missing implementation already at >> compile-time >>>>>> and >>>>>>>>> not only at runtime. If you decide not to provide a default >>>>>>>> implementation, >>>>>>>>> `XSupplier extends StateStoreConnector` would break existing code >> as >>>>>>>>> Matthias has already pointed out. >>>>>>>>> >>>>>>>>> 2) `process` method adding the StoreBuilders to the topology >>>>>>>>> If the default implementation returned `null` and `XSupplier >> extends >>>>>>>>> StateStoreConnector`, then existing code would break, because >>>>>>>>> `StreamsBuilder#addStateStore()` would throw a NPE. >>>>>>>>> >>>>>>>>> +1 for opening a WIP PR >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Bruno >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax < >>>>>> matth...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thank Paul! >>>>>>>>>> >>>>>>>>>> I agree with all of that. If we think that the general design is >>>>>> good, >>>>>>>>>> refactoring a PR if we want to pick a different name should not be >>>>>> too >>>>>>>>>> much additional work (hopefully). Thus, if you want to open a WIP >> PR >>>>>>>> and >>>>>>>>>> we use it to nail the open details, it might help to find a good >>>>>>>>>> conclusion. >>>>>>>>>> >>>>>>>>>>>> 2) Default method vs new interface: >>>>>>>>>> >>>>>>>>>> This seems to be the hardest tradeoff. I see the point about >>>>>>>>>> discoveability... Might be good to get input from others, which >>>>>> version >>>>>>>>>> they would prefer. >>>>>>>>>> >>>>>>>>>> Just to make clear, my suggestion from the last email was, that >>>>>>>>>> `Transformer` etc does not extend the new interface. Instead, a >> user >>>>>>>>>> that want to use this feature would need to implement both >>>>>> interfaces. >>>>>>>>>> >>>>>>>>>> If `Transformer extends StoreProvider` (just picking a name here) >>>>>>>>>> without default implementation existing code would break and thus >> it >>>>>>>> not >>>>>>>>>> a an option because of breaking backward compatibility. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 4/28/19 8:37 PM, Paul Whalen wrote: >>>>>>>>>>> Great thoughts Matthias, thanks! I think we're all agreed that >>>>>> naming >>>>>>>>> and >>>>>>>>>>> documentation/education are the biggest hurdles for this KIP, >> and in >>>>>>>>>> light >>>>>>>>>>> of that, I think it makes sense for me to just take a stab at a >> full >>>>>>>>>>> fledged PR with documentation to convince us that it's possible >> to >>>>>> do >>>>>>>>> it >>>>>>>>>>> with enough clarity. >>>>>>>>>>> >>>>>>>>>>> In response to your specific thoughts: >>>>>>>>>>> >>>>>>>>>>> 1) StateStoreConnector as a name: Really good point about >> defining >>>>>>>> the >>>>>>>>>>> difference between "adding" and "connecting." Guozhang suggested >>>>>>>>>>> StateStoreConnector which was definitely an improvement over my >>>>>>>>>>> StateStoresSupplier, but I think you're right that we need to be >>>>>>>>> careful >>>>>>>>>> to >>>>>>>>>>> make it clear that it's really accomplishing both. Thinking >> about >>>>>> it >>>>>>>>>> now, >>>>>>>>>>> one problem with Connector is that the implementer of the >> interface >>>>>>>> is >>>>>>>>>> not >>>>>>>>>>> really doing any connecting, it's providing/supplying the store >> that >>>>>>>>> will >>>>>>>>>>> be both added and connected. StoreProvider seems reasonable to >> me >>>>>>>> and >>>>>>>>>>> probably the best candidate at the moment, but it would be nice >> if >>>>>>>> the >>>>>>>>>> name >>>>>>>>>>> could convey that it's providing the store specifically so the >>>>>> caller >>>>>>>>> can >>>>>>>>>>> add it to the topology and connect it to the associated >> transformer. >>>>>>>>>>> >>>>>>>>>>> In general I think that really calling out what "adding" versus >>>>>>>>>>> "connecting" is in the documentation will help make the entire >>>>>>>> purpose >>>>>>>>> of >>>>>>>>>>> this feature more clear to the user. >>>>>>>>>>> >>>>>>>>>>> 2) Default method vs new interface: The choice of a default >> method >>>>>>>> was >>>>>>>>>>> influenced by Guozhang's fear about API bloat/discoverability. I >>>>>> can >>>>>>>>>>> definitely see it both ways Would the separate interface be a >>>>>>>>>>> sub-interface of Processor/TransformerSupplier or standalone? It >>>>>>>> seems >>>>>>>>>>> like you're suggesting standalone and I think that's what I >> favor. >>>>>>>> My >>>>>>>>>> only >>>>>>>>>>> concern there is that the interface wouldn't actually be a type >> to >>>>>>>> any >>>>>>>>>>> public API which sort of hurts discoverability. You would have >> to >>>>>>>> read >>>>>>>>>> the >>>>>>>>>>> javadocs for stream.process/transform() to discover that >>>>>> implementing >>>>>>>>> the >>>>>>>>>>> interface in addition to Processor/TransformerSupplier would add >> and >>>>>>>>>>> connect the store for you. But that added burden actually >> probably >>>>>>>>> helps >>>>>>>>>>> us in terms of making sure people don't mix and match, like you >>>>>> said. >>>>>>>>>>> >>>>>>>>>>> 3) Returning null instead of empty: Seems fair to me. I always >>>>>> worry >>>>>>>>>> about >>>>>>>>>>> returning null when an empty collection can be used instead, but >>>>>>>> given >>>>>>>>>> that >>>>>>>>>>> the library is the caller rather than the client I think your >> point >>>>>>>>> makes >>>>>>>>>>> sense here. >>>>>>>>>>> >>>>>>>>>>> 4) Returning Set instead of Collection: Agreed, don't see why >> not to >>>>>>>>> make >>>>>>>>>>> it more specific. >>>>>>>>>>> >>>>>>>>>>> Paul >>>>>>>>>>> >>>>>>>>>>> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax < >>>>>>>> matth...@confluent.io >>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, sorry for the long pause. Just trying to catch up here. >>>>>>>>>>>> >>>>>>>>>>>> I think it save to allow `addStateStore()` to be idempotent for >> the >>>>>>>>> same >>>>>>>>>>>> `StoreBuilder` object. In fact, the `name` is "hard coded" and >> thus >>>>>>>>> it's >>>>>>>>>>>> not really possible to use the same `StoreBuilder` object to >> create >>>>>>>>>>>> different stores. >>>>>>>>>>>> >>>>>>>>>>>> I also agree with the concern, that only allowing a single store >>>>>> (as >>>>>>>>>>>> proposed by Ivan) might be too restrictive. >>>>>>>>>>>> >>>>>>>>>>>> Overall, the current KIP version LGTM. I don't have mayor >> concerns >>>>>>>>> about >>>>>>>>>>>> user education for this case, but I agree that we need to >> document >>>>>>>>> this >>>>>>>>>>>> clearly. >>>>>>>>>>>> >>>>>>>>>>>> Some further comments: >>>>>>>>>>>> >>>>>>>>>>>> (1) I am not sure if `StateStoreConnector` is the best name for >> the >>>>>>>>> new >>>>>>>>>>>> interface. Note, that there are two concepts about stores: >>>>>>>>>>>> >>>>>>>>>>>> - adding a store: this makes the store available in the >> topology >>>>>> in >>>>>>>>>>>> general (however, the store is still "dangling", and not used) >>>>>>>>>>>> - connecting a store: this allows a processor etc to use a >> store >>>>>>>>>>>> >>>>>>>>>>>> The new interface does both, but its name only indicates that >>>>>> second >>>>>>>>>>>> part what might be confusing. It might be especially confusing >>>>>>>> because >>>>>>>>>>>> we want to disallow to mix the exiting "manually add and >> connect" >>>>>>>>>>>> pattern, with a new pattern to "auto add+connect". If the new >>>>>>>>> interface >>>>>>>>>>>> name indicates the connect part only, user might think they >> need to >>>>>>>>> add >>>>>>>>>>>> stores manually and can connect automatically. >>>>>>>>>>>> >>>>>>>>>>>> Unfortunately, I don't have a much better suggestion for a name >>>>>>>>> either. >>>>>>>>>>>> The only idea that came to my mind was `StoreProvider`: to me, a >>>>>>>>>>>> provider is a "service" interface that does work for us, ie, it >>>>>> adds >>>>>>>>> and >>>>>>>>>>>> connects a store. Not sure if this is too subtle, if we consider >>>>>>>> that >>>>>>>>>>>> there is already the `StoreSupplier` interface? >>>>>>>>>>>> >>>>>>>>>>>> But maybe somebody else might still have a good idea on how the >>>>>>>>> improve >>>>>>>>>>>> the name. >>>>>>>>>>>> >>>>>>>>>>>> In any case, I would suggest to shorten the name to >>>>>> `StoreConnector` >>>>>>>>>>>> instead of `StateStoreConnector`, because we also have >>>>>>>> `StoreSupplier` >>>>>>>>>>>> and `StoreBuilder`. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> (2) The KIP proposes to add the new interface to >>>>>> `ProcessorSupplier` >>>>>>>>> etc >>>>>>>>>>>> and to add a default implementation for the new method. Hence, >> user >>>>>>>>>>>> would need to overwrite this default implementation to op-in to >> the >>>>>>>>>>>> feature. I am wonder if it might be better to not add the new >>>>>>>>> interface >>>>>>>>>>>> to `ProcessorSupplier` etc and to just provide a new interface >> with >>>>>>>> no >>>>>>>>>>>> default implementation. Users would opt-in by adding the >> interface >>>>>>>>>>>> explicitly to their existing `ProcessorSupplier` implementation. >>>>>>>>>>>> Overwriting a default method and getting different behavior >> seems >>>>>> to >>>>>>>>> be >>>>>>>>>>>> a little subtle to me, especially, because we don't want to >> allow >>>>>> to >>>>>>>>>>>> mix-and-match the old and new approaches. Think: I only >> overwrite a >>>>>>>>>>>> default method and my code breaks. >>>>>>>>>>>> >>>>>>>>>>>> Thoughts? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> (3) If we keep the current default implementation for the new >>>>>>>> method, >>>>>>>>> I >>>>>>>>>>>> am wondering if it should return `null` instead of an empty >>>>>>>>> collection? >>>>>>>>>>>> This might be saver to detect bugs in user code for which, per >>>>>>>>> accident, >>>>>>>>>>>> an empty collection could be returned. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> (4) Should the new method return a `Set` instead of a >> `Collection` >>>>>>>> to >>>>>>>>>>>> indicate the semantics clearly (ie, returning the same >>>>>>>> `StoreBuilder` >>>>>>>>>>>> multiple times is idempotent and one cannot add+connect to it >>>>>>>> twice). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 4/6/19 12:27 PM, Paul Whalen wrote: >>>>>>>>>>>>> Ivan and Guozhang, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the thoughts! Ivan's use case is definitely >>>>>>>> interesting. >>>>>>>>>> The >>>>>>>>>>>>> way I see it, if we can achieve the main goal of the KIP >> (allowing >>>>>>>>>>>>> Processor/TransformerSuppliers to encapsulate their usage of >> state >>>>>>>>>>>> stores), >>>>>>>>>>>>> we will enable this kind of thing in "user space" very easily. >>>>>>>>>>>>> >>>>>>>>>>>>> I will say that I'm not totally sure that most use cases of >>>>>>>>> transform() >>>>>>>>>>>> use >>>>>>>>>>>>> just one state store. It's hard to know since I haven't seen >> many >>>>>>>>>>>> examples >>>>>>>>>>>>> in public, but my team's usages almost exclusively require >>>>>> multiple >>>>>>>>>> state >>>>>>>>>>>>> stores. We only reach for the low level processor API when we >>>>>> need >>>>>>>>>> that >>>>>>>>>>>>> complexity, and it's somewhat hard to imagine many use cases >> that >>>>>>>>> only >>>>>>>>>>>> need >>>>>>>>>>>>> one state store, since the high level DSL can usually >> accomplish >>>>>>>>> those >>>>>>>>>>>>> tasks. The example Ivan presented for instance looks like a >>>>>>>>>>>>> stream.groupByKey().reduce(...) to me. Ivan, I'd be curious >> what >>>>>>>>> sort >>>>>>>>>> of >>>>>>>>>>>>> other usages you're imagining. >>>>>>>>>>>>> >>>>>>>>>>>>> That being said, perhaps the Processor API should really just >> be >>>>>>>>>>>> considered >>>>>>>>>>>>> a separate paradigm in Streams, not just a lower level that we >>>>>>>> reach >>>>>>>>> to >>>>>>>>>>>>> when necessary. In which case it would be beneficial to make >> the >>>>>>>>>> simple >>>>>>>>>>>>> use cases easier. I've definitely talked about this with my >> own >>>>>>>>> team - >>>>>>>>>>>> if >>>>>>>>>>>>> you're less familiar with the kind of functional style that the >>>>>>>> high >>>>>>>>>>>> level >>>>>>>>>>>>> DSL offers, it might be easier to "see" your state and interact >>>>>>>> with >>>>>>>>> it >>>>>>>>>>>>> directly. >>>>>>>>>>>>> >>>>>>>>>>>>> Anyway, I've updated the KIP to reflect my current PR with >>>>>>>> Guozhang's >>>>>>>>>>>>> suggestions. It seems like there is at least some interest in >>>>>> that >>>>>>>>> on >>>>>>>>>>>> its >>>>>>>>>>>>> own and not a ton of pushback, so I think I will try to start a >>>>>>>> vote. >>>>>>>>>>>>> >>>>>>>>>>>>> Paul >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev < >>>>>>>> iponoma...@mail.ru> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi all! >>>>>>>>>>>>>> >>>>>>>>>>>>>> I was about to write another KIP, but found out that KIP-401 >>>>>>>>> addresses >>>>>>>>>>>>>> exactly the problem I faced. So let me jump into your >> discussion >>>>>>>> and >>>>>>>>>> ask >>>>>>>>>>>>>> you to assess another idea. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I fully agree with the KIP-401's motivation part. E. g in my >>>>>>>>> project I >>>>>>>>>>>> had >>>>>>>>>>>>>> to invent a wrapper class that hides the details of >> KeyValueStore >>>>>>>>>>>>>> management from business logic. Of course this should be done >>>>>>>> better >>>>>>>>>> in >>>>>>>>>>>>>> KStreams API. >>>>>>>>>>>>>> >>>>>>>>>>>>>> But I was about to look at this problem from another side and >>>>>>>>> propose >>>>>>>>>> a >>>>>>>>>>>>>> simple alternative in high-level DSL, that will not fit all >> the >>>>>>>>> cases, >>>>>>>>>>>> but >>>>>>>>>>>>>> most of them. Hence my idea does not exclude the Paul's >> proposal. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What if we restrict ourselves to *only one* KeyValueStore and >>>>>>>>> propose >>>>>>>>>> a >>>>>>>>>>>>>> method that resembles `aggregate` and `reduce` methods, like >>>>>>>> this: >>>>>>>>>>>>>> >>>>>>>>>>>>>> stream >>>>>>>>>>>>>> .map(...) >>>>>>>>>>>>>> .filter(...) >>>>>>>>>>>>>> .transform ((k, v, s)->{....}, Transformed.with(....)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> where >>>>>>>>>>>>>> * k, v -- input key & value >>>>>>>>>>>>>> * s -- a KeyValueStore provided as an argument >>>>>>>>>>>>>> * return value of the lambda should be KeyValue.pair(...) >>>>>>>>>>>>>> * Transformed.with... is a builder, used in order to define >> the >>>>>>>>>>>>>> Transformer and KeyValueStore building parameters. Some of >> these >>>>>>>>>>>> parameters >>>>>>>>>>>>>> should be: >>>>>>>>>>>>>> ** store's KeySerde, >>>>>>>>>>>>>> ** store's ValueSerde, >>>>>>>>>>>>>> ** whether the store is persistent or in-memory, >>>>>>>>>>>>>> ** store's name -- optional parameter, the system should be >> able >>>>>>>> to >>>>>>>>>>>> devise >>>>>>>>>>>>>> the name of the store transparently for the user, if we don't >>>>>> want >>>>>>>>> to >>>>>>>>>>>>>> devise it ourselves/share the store between processors. >>>>>>>>>>>>>> ** scheduled punctuation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Imagine we have a KStream<String, Integer>, and we need to >>>>>>>>> calculate a >>>>>>>>>>>>>> `derivative` stream, that is, a stream of 'deltas' of the >>>>>> provided >>>>>>>>>>>> integer >>>>>>>>>>>>>> values. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This could be achieved as simple as >>>>>>>>>>>>>> >>>>>>>>>>>>>> stream.transform((key, value, stateStore) -> { >>>>>>>>>>>>>> int previousValue = >>>>>>>>>>>>>> Optional.ofNullable(stateStore.get(key)).orElse(0); >>>>>>>>>>>>>> stateStore.put(key, value); >>>>>>>>>>>>>> return KeyValue.pair(key, value - previousValue); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> //we do not need to bother with store name, >> punctuation >>>>>>>> etc. >>>>>>>>>>>>>> //may be even Serde part can be omitted, since we can >>>>>>>>> inherit >>>>>>>>>>>> the >>>>>>>>>>>>>> serdes from stream by default >>>>>>>>>>>>>> , Transformed.with(Serdes.String(), Serdes.Integer()) >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> The hard part of it is that new `transform` method definition >>>>>>>> should >>>>>>>>>> be >>>>>>>>>>>>>> parameterized by six type parameters: >>>>>>>>>>>>>> >>>>>>>>>>>>>> * input/output/KeyValueStore key type, >>>>>>>>>>>>>> * input/output/KeyValueStore value type. >>>>>>>>>>>>>> >>>>>>>>>>>>>> However, it seems that all these types can be inferred from >> the >>>>>>>>>> provided >>>>>>>>>>>>>> lambda and Transformed.with instances. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What do you think about this? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 27.03.2019 20:45, Guozhang Wang пишет: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Paul, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the uploaded PR and the detailed description! I've >>>>>>>> made a >>>>>>>>>>>> pass >>>>>>>>>>>>>> on it and left some comments. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Overall I think I agree with you that passing in the >> storebuilder >>>>>>>>>>>> directly >>>>>>>>>>>>>> that store name is more convienent as it does not require >> another >>>>>>>>>>>>>> `addStore` call, but we just need to spend some more >>>>>> documentation >>>>>>>>>>>> effort >>>>>>>>>>>>>> on educating users about the two ways of connecting their >> stores. >>>>>>>>> I'm >>>>>>>>>>>>>> slightly concerned about this education curve but I can be >>>>>>>> convinced >>>>>>>>>> if >>>>>>>>>>>>>> most people felt it is worthy. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen < >> pgwha...@gmail.com> >>>>>>>> < >>>>>>>>>>>> pgwha...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'd like to resurrect this discussion with a cursory, >>>>>>>>> proof-of-concept >>>>>>>>>>>>>> implementation of the KIP which combines many of our ideas: >>>>>>>>>>>> https://github.com/apache/kafka/pull/6496. I tried to keep the >>>>>>>> diff >>>>>>>>> as >>>>>>>>>>>>>> small as possible for now, just using it to convey the main >>>>>> ideas. >>>>>>>>>> But >>>>>>>>>>>>>> I'll separately address some of our earlier discussion: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Will there be a new, separate interface for users to >>>>>>>> implement >>>>>>>>>> for >>>>>>>>>>>> the >>>>>>>>>>>>>> new functionality? No, to hopefully keep things simple, >> all of >>>>>>>>> the >>>>>>>>>>>>>> Processor/TransformerSupplier interfaces will just extend >>>>>>>>>>>>>> StateStoresSupplier, allowing users to opt in to this >>>>>>>>> functionality >>>>>>>>>>>> by >>>>>>>>>>>>>> overriding the default implementation that gives an empty >>>>>> list. >>>>>>>>>>>>>> - Will the interface allow users to specify the store >> name, or >>>>>>>>> the >>>>>>>>>>>>>> entire StoreBuilder? The entire StoreBuilder, so the >>>>>>>>>>>>>> Processor/TransformerSupplier can completely encapsulate >> name >>>>>>>> and >>>>>>>>>>>>>> implementation of a state store if desired. >>>>>>>>>>>>>> - Will the old way of specifying store names alongside the >>>>>>>>> supplier >>>>>>>>>>>> when >>>>>>>>>>>>>> calling stream.process/transform() be deprecated? No, this >> is >>>>>>>>>> still a >>>>>>>>>>>>>> legitimate way to wire up Processors/Transformers and their >>>>>>>>> stores. >>>>>>>>>>>> But >>>>>>>>>>>>>> I >>>>>>>>>>>>>> would recommend not allowing stream.process/transform() >> calls >>>>>>>>> that >>>>>>>>>>>> use >>>>>>>>>>>>>> both >>>>>>>>>>>>>> store declaration mechanisms (this restriction is not in >> the >>>>>>>>> proof >>>>>>>>>> of >>>>>>>>>>>>>> concept) >>>>>>>>>>>>>> - How will we handle adding the same state store to the >>>>>>>> topology >>>>>>>>>>>>>> multiple times because different >>>>>> Processor/TransformerSuppliers >>>>>>>>>>>> declare >>>>>>>>>>>>>> it? >>>>>>>>>>>>>> topology.addStateStore() will be slightly relaxed for >>>>>>>>> convenience, >>>>>>>>>>>> and >>>>>>>>>>>>>> will >>>>>>>>>>>>>> allow adding the same StoreBuilder multiple times as long >> as >>>>>>>> the >>>>>>>>>>>> exact >>>>>>>>>>>>>> same >>>>>>>>>>>>>> StoreBuilder instance is being added for the same store >> name. >>>>>>>>> This >>>>>>>>>>>>>> seems >>>>>>>>>>>>>> to prevent in practice the issue of accidentally making two >>>>>>>> state >>>>>>>>>>>> stores >>>>>>>>>>>>>> one by adding with the same name. For additional safety, >> if >>>>>> we >>>>>>>>>>>> wanted >>>>>>>>>>>>>> to >>>>>>>>>>>>>> (not in the proof of concept), we could allow for this >>>>>>>> relaxation >>>>>>>>>>>> only >>>>>>>>>>>>>> for >>>>>>>>>>>>>> internal callers of topology.addStateStore(). >>>>>>>>>>>>>> >>>>>>>>>>>>>> So, in summary, the use cases look like: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - 1 transformer/processor that owns its store: Using the >> new >>>>>>>>>>>>>> StateStoresSupplier interface method to supply its >>>>>>>> StoreBuilders >>>>>>>>>> that >>>>>>>>>>>>>> will >>>>>>>>>>>>>> be added to the topology automatically. >>>>>>>>>>>>>> - Multiple transformer/processors that share the same >> store: >>>>>>>>> Either >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. The old way: the StoreBuilder is defined "far away" from >>>>>> the >>>>>>>>>>>>>> Transformer/Processor implementations, and is added to the >>>>>>>>> topology >>>>>>>>>>>>>> manually by the user >>>>>>>>>>>>>> 2. The new way: the StoreBuilder is defined closer to the >>>>>>>>>>>>>> Transformer/Processor implementations, and the same >> instance >>>>>> is >>>>>>>>>>>>>> returned by >>>>>>>>>>>>>> all Transformer/ProcessorSuppliers that need it >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> This makes the KIP wiki a bit stale; I'll update if we want to >>>>>>>> bring >>>>>>>>>>>> this >>>>>>>>>>>>>> design to a vote. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>> Paul >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang < >>>>>> wangg...@gmail.com >>>>>>>>> >>>>>>>>> < >>>>>>>>>>>> wangg...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Matthias / Paul, >>>>>>>>>>>>>> >>>>>>>>>>>>>> The concern I had about introducing `StoreBuilderSupplier` is >>>>>>>> simply >>>>>>>>>>>>>> because it is another XXSupplier to the public API, so I'd >> like >>>>>> to >>>>>>>>> ask >>>>>>>>>>>> if >>>>>>>>>>>>>> we really have to add it :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> The difference between encapsulating the store name and >>>>>>>>> encapsulating >>>>>>>>>>>> the >>>>>>>>>>>>>> full state store builder is that, in the former: >>>>>>>>>>>>>> >>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>> >>>>>>>>>>>>>> String storeName = "store1"; >>>>>>>>>>>>>> builder.addStore(new MyStoreBuilder(storeName)); >>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplier(storeName)); // >>>>>>>>>> following >>>>>>>>>>>>>> >>>>>>>>>>>>>> my >>>>>>>>>>>>>> >>>>>>>>>>>>>> proposal, that the store name can be passed in and used for >> both >>>>>>>>>>>>>> `listStores` and in the `Transformer#init`; so the Transformer >>>>>>>>>> function >>>>>>>>>>>>>> does not need to get the constant string name again. >>>>>>>>>>>>>> >>>>>>>>>>>>>> // one caveat to admit, is that >>>>>>>>>>>>>> MyTransofmerSupplier logic may be just unique to `store1` so >> it >>>>>>>>> cannot >>>>>>>>>>>> be >>>>>>>>>>>>>> reused with a different store name anyways. >>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>> >>>>>>>>>>>>>> While in the latter: >>>>>>>>>>>>>> >>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>> >>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplierForStore1); // >> the >>>>>>>> name >>>>>>>>>> is >>>>>>>>>>>>>> just indicating that we may have one such supplier for each >>>>>> store. >>>>>>>>>>>>>> >>>>>>>>>>>>>> ----------- >>>>>>>>>>>>>> >>>>>>>>>>>>>> I understand the latter introduce more convenience from the >> API, >>>>>>>> but >>>>>>>>>> the >>>>>>>>>>>>>> cost is that since we still cannot completely >> `builder.addStore`, >>>>>>>>> but >>>>>>>>>>>>>> >>>>>>>>>>>>>> only >>>>>>>>>>>>>> >>>>>>>>>>>>>> reduce its semantic scope to shared state stores only,; hence >>>>>>>> users >>>>>>>>>> need >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> learn two ways of creating state stores for those two >> patterns. >>>>>>>>>>>>>> >>>>>>>>>>>>>> My argument is that more public APIs requires longer learning >>>>>>>> curve >>>>>>>>>> for >>>>>>>>>>>>>> users, and introduces more usage patterns that may confuse >> users >>>>>>>>> (the >>>>>>>>>>>>>> proposal I had tries to replace one with another completely). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen < >> pgwha...@gmail.com> >>>>>>>> < >>>>>>>>>>>> pgwha...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the great thoughts Matthias and Guozhang! >>>>>>>>>>>>>> >>>>>>>>>>>>>> If I'm not mistaken, Guozhang's suggestion is what my second >>>>>>>>>>>>>> >>>>>>>>>>>>>> alternative >>>>>>>>>>>>>> >>>>>>>>>>>>>> on >>>>>>>>>>>>>> >>>>>>>>>>>>>> the KIP is ("Have the added method on the Supplier interfaces >>>>>> only >>>>>>>>>>>>>> >>>>>>>>>>>>>> return >>>>>>>>>>>>>> >>>>>>>>>>>>>> store names, not builders"). I do think it would be a >> worthwhile >>>>>>>>>>>>>> >>>>>>>>>>>>>> usability >>>>>>>>>>>>>> >>>>>>>>>>>>>> improvement on its own, but to Matthias's point, it doesn't >>>>>>>> achieve >>>>>>>>>> the >>>>>>>>>>>>>> full goal of completing encapsulating a state store and it's >>>>>>>>> processor >>>>>>>>>>>>>> >>>>>>>>>>>>>> - >>>>>>>>>>>>>> >>>>>>>>>>>>>> it >>>>>>>>>>>>>> >>>>>>>>>>>>>> encapsulates the name, but not the StateStoreBuilder. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm really intrigued by Matthias's idea that forgoes the >> default >>>>>>>>>>>>>> >>>>>>>>>>>>>> interface >>>>>>>>>>>>>> >>>>>>>>>>>>>> method I proposed. Having smaller, separate interfaces is a >>>>>>>>> powerful >>>>>>>>>>>>>> >>>>>>>>>>>>>> idea >>>>>>>>>>>>>> >>>>>>>>>>>>>> and I think a cleaner API than what I proposed. The >> non-shared >>>>>>>>> store >>>>>>>>>>>>>> >>>>>>>>>>>>>> use >>>>>>>>>>>>>> >>>>>>>>>>>>>> case is handled well here, and the shared store use case is >>>>>>>>> possible, >>>>>>>>>>>>>> though maybe still not as graceful as we would like (having to >>>>>> add >>>>>>>>> the >>>>>>>>>>>>>> StoreBuilderSupplier before the StoreNameSupplier seems maybe >> too >>>>>>>>>>>>>> >>>>>>>>>>>>>> subtle >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> me). >>>>>>>>>>>>>> >>>>>>>>>>>>>> We're all agreed that one of the big problems with the shared >>>>>>>> store >>>>>>>>>> use >>>>>>>>>>>>>> case is how to deal with adding the same store to the topology >>>>>>>>>> multiple >>>>>>>>>>>>>> times. Catching the "store already added" exception is risky. >>>>>>>>> Here's >>>>>>>>>>>>>> >>>>>>>>>>>>>> a >>>>>>>>>>>>>> >>>>>>>>>>>>>> maybe radical idea: change `topology.addStateStore()` to be >>>>>>>>> idempotent >>>>>>>>>>>>>> >>>>>>>>>>>>>> for >>>>>>>>>>>>>> >>>>>>>>>>>>>> adding a given state store name and `StoreBuilder`. In other >>>>>>>> words, >>>>>>>>>>>>>> `addStateStore` would not throw the "store already added" >>>>>>>> exception >>>>>>>>> if >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> `StoreBuilder` being added for a given name has the same >> identity >>>>>>>> as >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> one that has already been added. Does this eliminate all the >>>>>> bugs >>>>>>>>>>>>>> >>>>>>>>>>>>>> we're >>>>>>>>>>>>>> >>>>>>>>>>>>>> worried about? Thinking about it for a few minutes, it seems >> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> eliminate >>>>>>>>>>>>>> >>>>>>>>>>>>>> most at least (would a user really use the exact same >>>>>> StoreBuilder >>>>>>>>>> when >>>>>>>>>>>>>> they intend there to be two stores?). It might make the API >>>>>>>>> slightly >>>>>>>>>>>>>> harder to use if a user isn't immediately aware of that >> subtlety, >>>>>>>>> but >>>>>>>>>> a >>>>>>>>>>>>>> good error message should ease the pain, and it would happen >>>>>>>>>>>>>> >>>>>>>>>>>>>> immediately >>>>>>>>>>>>>> >>>>>>>>>>>>>> during development. >>>>>>>>>>>>>> >>>>>>>>>>>>>> And with regards to Matthias's comment about whether we need >> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> deprecate >>>>>>>>>>>>>> >>>>>>>>>>>>>> existing varargs transform methods - I don't think we need to, >>>>>> but >>>>>>>>> it >>>>>>>>>>>>>> >>>>>>>>>>>>>> might >>>>>>>>>>>>>> >>>>>>>>>>>>>> be nice for there only to be one way to do things, assuming >>>>>>>> whatever >>>>>>>>>> we >>>>>>>>>>>>>> come up with supports all existing use cases. I don't feel >>>>>>>> strongly >>>>>>>>>>>>>> >>>>>>>>>>>>>> about >>>>>>>>>>>>>> >>>>>>>>>>>>>> this, but if we don't deprecate, I do think it's important to >> add >>>>>>>>>>>>>> >>>>>>>>>>>>>> checks >>>>>>>>>>>>>> >>>>>>>>>>>>>> that prevent users from trying to do the same thing in two >>>>>>>> different >>>>>>>>>>>>>> >>>>>>>>>>>>>> ways, >>>>>>>>>>>>>> >>>>>>>>>>>>>> as we've discussed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Paul >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax < >>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang, >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding the last option to catch "store exist already" >>>>>> exception >>>>>>>>>>>>>> >>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be >> hiding >>>>>>>>>>>>>> >>>>>>>>>>>>>> actual >>>>>>>>>>>>>> >>>>>>>>>>>>>> user bugs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I agree with this concern. From my original email: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> The only disadvantage I see, might be >>>>>>>>>>>>>> potential bugs about sharing state if two different stores are >>>>>>>>>>>>>> >>>>>>>>>>>>>> named >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> same by mistake (this would not be detected). >>>>>>>>>>>>>> >>>>>>>>>>>>>> For your new proposal: I am not sure if it addresses Paul's >>>>>>>> original >>>>>>>>>>>>>> idea -- I hope Paul can clarify. From my understanding, the >> idea >>>>>>>> was >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> encapsulate a store and its processor. As many stores are not >>>>>>>>> shared, >>>>>>>>>>>>>> this seems to be quite useful. Your proposal falls a little >> short >>>>>>>> to >>>>>>>>>>>>>> support encapsulation for none-shared stores. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 12/15/18 1:40 AM, Guozhang Wang wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your feedbacks. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding the last option to catch "store exist already" >>>>>> exception >>>>>>>>>>>>>> >>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be >> hiding >>>>>>>>>>>>>> >>>>>>>>>>>>>> actual >>>>>>>>>>>>>> >>>>>>>>>>>>>> user bugs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thinking about Paul's proposal and your suggestion again, I'd >>>>>> like >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> propose another alternative somewhere in the middle of your >>>>>>>>>>>>>> >>>>>>>>>>>>>> approaches, >>>>>>>>>>>>>> >>>>>>>>>>>>>> i.e. we still let users to create sharable state stores via >>>>>>>>>>>>>> `addStateStore`, and we allow the TransformerSupplier to >> return a >>>>>>>>>>>>>> >>>>>>>>>>>>>> list >>>>>>>>>>>>>> >>>>>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>> state stores that it needs, i.e.: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public interface TransformerSupplier<K, V, R> { >>>>>>>>>>>>>> Transformer<K, V, R> get(); >>>>>>>>>>>>>> default List<String> stateStoreNames() { >>>>>>>>>>>>>> return Collections.emptyList(); >>>>>>>>>>>>>> < >>>>>>>> https://cwiki.apache.org/confluence/pages/Collections.emptyList() >>>>>>>>>>>>>> >>>>>>>>>>>>>> ;> >>>>>>>>>>>>>> >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> by doing this users can still "consolidate" the references of >>>>>>>> store >>>>>>>>>>>>>> >>>>>>>>>>>>>> names >>>>>>>>>>>>>> >>>>>>>>>>>>>> in a single place in the transform call, e.g.: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public class MyTransformerSupplier<K, V, R> { >>>>>>>>>>>>>> private String storeName; >>>>>>>>>>>>>> >>>>>>>>>>>>>> public class MyTransformer<K, V, R> { >>>>>>>>>>>>>> >>>>>>>>>>>>>> .... >>>>>>>>>>>>>> >>>>>>>>>>>>>> init() { >>>>>>>>>>>>>> store = context.getStateStore(storeName); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> default List<String> stateStoreNames() { >>>>>>>>>>>>>> return Collections.singletonList(storeName); >>>>>>>>>>>>>> < >>>>>>>> https://cwiki.apache.org/confluence/pages/Collections.emptyList() >>>>>>>>>>>>>> >>>>>>>>>>>>>> ;> >>>>>>>>>>>>>> >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Basically, we move the parameters from the caller of >> `transform` >>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> inside >>>>>>>>>>>>>> >>>>>>>>>>>>>> the TransformSuppliers. DSL implementations would not change >>>>>> much, >>>>>>>>>>>>>> >>>>>>>>>>>>>> simply >>>>>>>>>>>>>> >>>>>>>>>>>>>> calling `connectStateStore` by getting the list of names from >> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> provided >>>>>>>>>>>>>> >>>>>>>>>>>>>> function. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax < >>>>>>>>>>>>>> >>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Just a meta comment: do we really need to deprecate existing >>>>>>>>>>>>>> `transform()` etc methods? >>>>>>>>>>>>>> >>>>>>>>>>>>>> The last argument is a vararg, and thus, just keeping the >>>>>> existing >>>>>>>>>>>>>> >>>>>>>>>>>>>> API >>>>>>>>>>>>>> >>>>>>>>>>>>>> for this part seems to work too, allowing to implement both >>>>>>>>>>>>>> >>>>>>>>>>>>>> patterns? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also, instead of adding a default method, we could also add a >> new >>>>>>>>>>>>>> interface `StoreBuilderSupplier` with method >> `List<StoreBuilder> >>>>>>>>>>>>>> stateStores()` -- users could implement `TransformerSupplier` >> and >>>>>>>>>>>>>> `StoreBuilderSupplier` at once; and for this case, we require >>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>> users >>>>>>>>>>>>>> >>>>>>>>>>>>>> don't provide store name in `transform()`. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Similar, we could add an interface `StoreNameSupplier` with >>>>>> method >>>>>>>>>>>>>> `List<String> stateStores()`. This allows to "auto-wire" a >>>>>>>>>>>>>> >>>>>>>>>>>>>> transformer >>>>>>>>>>>>>> >>>>>>>>>>>>>> to existing stores (to avoid the issue to add the same store >>>>>>>>>>>>>> >>>>>>>>>>>>>> multiple >>>>>>>>>>>>>> >>>>>>>>>>>>>> times). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hence, for shared stores, there would be one "main" >> transformer >>>>>>>>>>>>>> >>>>>>>>>>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>> implements `StoreBuilderSupplier` and that must be added >> first to >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> topology. The other transformers would implement >>>>>>>>>>>>>> >>>>>>>>>>>>>> `StoreNameSupplier` >>>>>>>>>>>>>> >>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>> just connect to those stores. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Another possibility to avoid the issue of adding the same >> stores >>>>>>>>>>>>>> multiple times would be, that the DSL always calls >>>>>>>>>>>>>> >>>>>>>>>>>>>> `addStateStore()` >>>>>>>>>>>>>> >>>>>>>>>>>>>> but >>>>>>>>>>>>>> >>>>>>>>>>>>>> catches a potential "store exists already" exception and falls >>>>>>>>>>>>>> >>>>>>>>>>>>>> back >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> `connectProcessorAndStateStore()` for this case. Thus, we >> would >>>>>>>>>>>>>> >>>>>>>>>>>>>> not >>>>>>>>>>>>>> >>>>>>>>>>>>>> need >>>>>>>>>>>>>> >>>>>>>>>>>>>> the `StoreNameSupplier` interface and the order in which >>>>>>>>>>>>>> >>>>>>>>>>>>>> transformers >>>>>>>>>>>>>> >>>>>>>>>>>>>> are added would not matter either. The only disadvantage I >> see, >>>>>>>>>>>>>> >>>>>>>>>>>>>> might >>>>>>>>>>>>>> >>>>>>>>>>>>>> be >>>>>>>>>>>>>> >>>>>>>>>>>>>> potential bugs about sharing state if two different stores are >>>>>>>>>>>>>> >>>>>>>>>>>>>> named >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> same by mistake (this would not be detected). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Just some ideas I wanted to share. What do you think? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 12/11/18 3:46 AM, Paul Whalen wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Ah yes of course, this was an oversight, I completely ignored >> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> multiple >>>>>>>>>>>>>> >>>>>>>>>>>>>> processors sharing the same state store when writing up the >> KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Which >>>>>>>>>>>>>> >>>>>>>>>>>>>> is >>>>>>>>>>>>>> >>>>>>>>>>>>>> funny, because I've actually done this (different processors >>>>>>>>>>>>>> >>>>>>>>>>>>>> sharing >>>>>>>>>>>>>> >>>>>>>>>>>>>> state >>>>>>>>>>>>>> >>>>>>>>>>>>>> stores) a fair amount myself, and I've settled on a pattern >>>>>>>>>>>>>> >>>>>>>>>>>>>> where I >>>>>>>>>>>>>> >>>>>>>>>>>>>> group >>>>>>>>>>>>>> >>>>>>>>>>>>>> the Processors in an enclosing class, and that enclosing class >>>>>>>>>>>>>> >>>>>>>>>>>>>> handles >>>>>>>>>>>>>> >>>>>>>>>>>>>> as >>>>>>>>>>>>>> >>>>>>>>>>>>>> much as possible. Here's a gist showing the rough structure, >>>>>>>>>>>>>> >>>>>>>>>>>>>> just >>>>>>>>>>>>>> >>>>>>>>>>>>>> for >>>>>>>>>>>>>> >>>>>>>>>>>>>> context: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 >>>>>>>>>>>>>> >>>>>>>>>>>>>> . Note how it adds the stores to the topology, as well as >>>>>>>>>>>>>> >>>>>>>>>>>>>> providing a >>>>>>>>>>>>>> >>>>>>>>>>>>>> public method with the store names. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I don't think my proposal completely conflicts with the >> multiple >>>>>>>>>>>>>> >>>>>>>>>>>>>> processors >>>>>>>>>>>>>> >>>>>>>>>>>>>> sharing state stores use case, since you can create a supplier >>>>>>>>>>>>>> >>>>>>>>>>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>> provides the store name you want, somewhat independently of >> your >>>>>>>>>>>>>> >>>>>>>>>>>>>> actual >>>>>>>>>>>>>> >>>>>>>>>>>>>> Processor logic. The issue I do see though, is that >>>>>>>>>>>>>> topology.addStateStore() can only be called once for a given >>>>>>>>>>>>>> >>>>>>>>>>>>>> store. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So >>>>>>>>>>>>>> >>>>>>>>>>>>>> for >>>>>>>>>>>>>> >>>>>>>>>>>>>> your example, if the there was a single TransformerSupplier >> that >>>>>>>>>>>>>> >>>>>>>>>>>>>> was >>>>>>>>>>>>>> >>>>>>>>>>>>>> passed >>>>>>>>>>>>>> >>>>>>>>>>>>>> into both transform() calls, "store1" would be added (under >> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> hood) >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> the topology twice, which is no good. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Perhaps this suggests that one of my alternatives on the KIP >>>>>>>>>>>>>> >>>>>>>>>>>>>> might >>>>>>>>>>>>>> >>>>>>>>>>>>>> be >>>>>>>>>>>>>> >>>>>>>>>>>>>> desirable: either not having the suppliers return >> StoreBuilders >>>>>>>>>>>>>> >>>>>>>>>>>>>> (just >>>>>>>>>>>>>> >>>>>>>>>>>>>> store >>>>>>>>>>>>>> >>>>>>>>>>>>>> names), or not deprecating the old methods that take >> "String... >>>>>>>>>>>>>> stateStoreNames". I'll have to think about it a bit. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Paul >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang < >>>>>>>>>>>>>> >>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Paul, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the great writeup (very detailed and crystal >>>>>>>>>>>>>> >>>>>>>>>>>>>> motivation >>>>>>>>>>>>>> >>>>>>>>>>>>>> sections!). >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is quite an interesting idea and I do like the API >>>>>>>>>>>>>> >>>>>>>>>>>>>> cleanness >>>>>>>>>>>>>> >>>>>>>>>>>>>> you >>>>>>>>>>>>>> >>>>>>>>>>>>>> proposed. The original motivation of letting StreamsTopology >> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> add >>>>>>>>>>>>>> >>>>>>>>>>>>>> state >>>>>>>>>>>>>> >>>>>>>>>>>>>> stores though, is to allow different processors to share the >>>>>>>>>>>>>> >>>>>>>>>>>>>> state >>>>>>>>>>>>>> >>>>>>>>>>>>>> store. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For example: >>>>>>>>>>>>>> >>>>>>>>>>>>>> builder.addStore("store1"); >>>>>>>>>>>>>> >>>>>>>>>>>>>> // a path of stream transformations that leads to KStream >>>>>>>>>>>>>> >>>>>>>>>>>>>> stream1. >>>>>>>>>>>>>> >>>>>>>>>>>>>> stream1.transform(..., "store1"); >>>>>>>>>>>>>> >>>>>>>>>>>>>> // another path that generates a KStream stream2. >>>>>>>>>>>>>> stream2.transform(..., "store1"); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Behind the scene, Streams will make sure stream1 / stream2 >>>>>>>>>>>>>> >>>>>>>>>>>>>> transformations >>>>>>>>>>>>>> >>>>>>>>>>>>>> will always be grouped together as a single group of tasks, >> each >>>>>>>>>>>>>> >>>>>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>> which >>>>>>>>>>>>>> >>>>>>>>>>>>>> will be executed by a single thread and hence there's no >>>>>>>>>>>>>> >>>>>>>>>>>>>> concurrency >>>>>>>>>>>>>> >>>>>>>>>>>>>> issues >>>>>>>>>>>>>> >>>>>>>>>>>>>> on accessing the store from different operators within the >> same >>>>>>>>>>>>>> >>>>>>>>>>>>>> task. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm >>>>>>>>>>>>>> >>>>>>>>>>>>>> not sure how common this use case is, but I'd like to hear if >>>>>>>>>>>>>> >>>>>>>>>>>>>> you >>>>>>>>>>>>>> >>>>>>>>>>>>>> have >>>>>>>>>>>>>> >>>>>>>>>>>>>> any >>>>>>>>>>>>>> >>>>>>>>>>>>>> thoughts maintaining this since the current proposal seems >>>>>>>>>>>>>> >>>>>>>>>>>>>> exclude >>>>>>>>>>>>>> >>>>>>>>>>>>>> this >>>>>>>>>>>>>> >>>>>>>>>>>>>> possibility. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen < >> pgwha...@gmail.com> >>>>>> < >>>>>>>>>>>> pgwha...@gmail.com> >>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here's KIP-401 for discussion, a minor Kafka Streams API >> change >>>>>>>>>>>>>> >>>>>>>>>>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>> I >>>>>>>>>>>>>> >>>>>>>>>>>>>> think could greatly increase the usability of the low-level >>>>>>>>>>>>>> >>>>>>>>>>>>>> processor >>>>>>>>>>>>>> >>>>>>>>>>>>>> API. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have some code written but will wait to see if there is buy >>>>>>>>>>>>>> >>>>>>>>>>>>>> in >>>>>>>>>>>>>> >>>>>>>>>>>>>> before >>>>>>>>>>>>>> >>>>>>>>>>>>>> going all out and creating a pull request. It seems like most >>>>>>>>>>>>>> >>>>>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> work >>>>>>>>>>>>>> >>>>>>>>>>>>>> would be in updating documentation and tests. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>> Paul >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >> >> >
signature.asc
Description: OpenPGP digital signature