Thanks John. I like the proposal. Btw: I was just going over the KIP and realized that we add new methods to `StreamBuilder`, `Topology`, and `KStream` that take the new `ProcessorSupplier` class -- should we also deprecate the corresponding existing ones that take the old `ProcessorSupplier`?
-Matthias On 9/30/20 7:46 AM, John Roesler wrote: > Thanks Paul and Sophie, > > Your feedback certainly underscores the need to be explicit > in the javadoc about why that parameter is Optional. Getting > this kind of feedback before the release is exactly the kind > of outcome we hope to get from the KIP process! > > Thanks, > -John > > On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote: >> John, I totally agree that adding a method to Processor is cumbersome and >> not a good path. I was imagining maybe a separate interface that could be >> used in the appropriate context, but I don't think that makes too much >> sense - it's just too far away from what Kafka Streams is. I was >> originally more interested in the "why" Optional than the "how" (I think my >> original reply overplayed the "optional as an argument" concern). But >> you've convinced me that there is a perfectly legitimate "why". We should >> make sure that it's clear why it's Optional, but I suppose that goes >> without saying. It's a nice opportunity to make the API reflect more what >> is actually going on under the hood. >> >> Thanks! >> Paul >> >> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman <sop...@confluent.io> >> wrote: >> >>> FWIW, while I'm really not a fan of Optional in general, I agree that its >>> usage >>> here seems appropriate. Even for those rare software developers who >>> carefully >>> read all the docs several times over, I think it wouldn't be too hard to >>> miss a >>> note about the RecordMetadata possibly being null. >>> >>> Especially because it's not that obvious why at first glance, and takes a >>> bit of >>> thinking to realize that records originating from a Punctuator wouldn't >>> have a >>> "current record". This is something that has definitely confused users >>> today. >>> >>> It's on us to improve the education here -- and an Optional<RecordMetadata> >>> would naturally raise awareness of this subtlety >>> >>> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io> >>> wrote: >>> >>>> Does my reply address your concerns? >>>> >>>> >>>> Yes; also, I definitely misread part of the proposal earlier and thought >>>> you had put >>>> the timestamp field in RecordMetadata. Sorry for not giving things a >>>> closer look >>>> before responding! I'm not sure my original message made much sense given >>>> the misunderstanding, but thanks for responding anyway :P >>>> >>>> Having given the proposal a second pass, I agree, it's very elegant. +1 >>>> >>>> On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org> >>> wrote: >>>>> Thanks for the reply, Sophie, >>>>> >>>>> I think I may have summarized too much in my prior reply. >>>>> >>>>> In the currently proposed KIP, any caller of forward() must >>>>> supply a Record, which consists of: >>>>> * key >>>>> * value >>>>> * timestamp >>>>> * headers (with a convenience constructor that sets empty >>>>> headers) >>>>> >>>>> These aren't what I was referring to as potentially being >>>>> undefined downstream, since thanks to the introduction of >>>>> Record, they are, as you're advocating, required to be >>>>> defined everywhere, even when forwarding from a punctuator. >>>>> >>>>> So to be clear, the intent of this change is actually to >>>>> _enforce_ that timestamp would never be undefined (which it >>>>> currently can be). Also, since punctuators _are_ going to >>>>> have to "make up" a timestamp going forward, we should note >>>>> that the "punctuate" method currently passes in a good >>>>> timestamp that they can use: for system-time punctuations, >>>>> they receive the current system time, and for stream-time >>>>> punctuations, they get the current stream time. >>>>> >>>>> The potentially undefined RecordMetadata only contains these >>>>> fields: >>>>> * topic >>>>> * partition >>>>> * offset >>>>> >>>>> These fields aren't required (or even used) in a Sink, and >>>>> it doesn't seem like they would be important to many >>>>> applications. Furthermore, it doesn't _seem_ like you'd even >>>>> want to set these fields. They seem purely informational and >>>>> only useful in the context when you are actually processing >>>>> a real input record. It doesn't sound like you were asking >>>>> for it, but just to put it on the record, I think if we were >>>>> to require values for the metadata from punctuators, people >>>>> would mostly just make up their own dummy values, to no >>>>> one's benefit. >>>>> >>>>> I should also note that with the current >>>>> Record/RecordMetadata split, we will have the freedom to >>>>> move fields into the Record class (or even add new fields) >>>>> if we want them to become "data" as opposed to "metadata" in >>>>> the future. >>>>> >>>>> Thanks for your reply; I was similarly floored when I >>>>> realized the true nature of the current situation. Does my >>>>> reply address your concerns? >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman >>>>> wrote: >>>>>>> However, the record metadata is only defined when the parent >>> forwards >>>>>>> while processing a >>>>>> >>>>>> real record, not when it calls forward from the punctuator >>>>>> >>>>>> >>>>>> Can we take a step back for a second...why wouldn't you be required to >>>>> set >>>>>> the RecordContext >>>>>> yourself when calling forward from a Punctuator? I think I agree with >>>>> Paul >>>>>> here, it seems kind of >>>>>> absurd not to enforce that the RecordContext be present inside the >>>>>> process() method. >>>>>> >>>>>> The original problem with Punctuators, as I understood it, was that >>> all >>>>> of >>>>>> the RecordContext >>>>>> fields were exposed automatically to both the Processor and any >>>>> Punctuator, >>>>>> due to being >>>>>> direct methods on the ProcessorContext. We can't control which >>>>>> ProcessorContext methods >>>>>> someone will call from with a Punctuator vs from a Processor. The best >>>>> we >>>>>> could do was >>>>>> set these "nonsense" fields to null when inside a Punctuator, or set >>>>> them >>>>>> to some dummy >>>>>> values as you pointed out. >>>>>> >>>>>> But then you proposed the solution of a separate RecordContext which >>> is >>>>> not >>>>>> attached to the >>>>>> ProcessorContext at all. This seemed to solve the above problem very >>>>>> neatly: we only pass >>>>>> in the RecordContext to the process() method, so we don't have to >>> worry >>>>>> about people trying >>>>>> to access these fields from within a Punctuator. The fields aren't >>>>>> accessible unless they're >>>>>> defined. >>>>>> >>>>>> So what happens when someone wants to forward something from within a >>>>>> Punctuator? I >>>>>> don't think it's reasonable to let the timestamp field be undefined, >>>>> ever. >>>>>> What if the Punctuator >>>>>> forwards directly to a sink, or directly to some windowing logic. Are >>> we >>>>>> supposed to add >>>>>> handling for the RecordContext == null case to every processor? Or are >>>>> we >>>>>> just going to >>>>>> assume the implicit restriction that users will only forward records >>>>> from a >>>>>> Punctuator to >>>>>> downstream processors that know how to handle and/or set the >>>>> RecordContext >>>>>> if it's >>>>>> undefined. That seems to throw away a lot of the awesome safety added >>> in >>>>>> this KIP >>>>>> >>>>>> Apologies for the rant. But I feel pretty strongly that allowing to >>>>> forward >>>>>> records from a >>>>>> Punctuator without a defined RecordContext would be asking for >>> trouble. >>>>>> Imo, if you >>>>>> want to forward from a Punctuator, you need to store the info you need >>>>> in >>>>>> order to >>>>>> set the timestamp, or make one up yourself >>>>>> >>>>>> (the one alternative I can think of here is that maybe we could pass >>> in >>>>> the >>>>>> current >>>>>> partition time, so users can at least put in a reasonable estimate for >>>>> the >>>>>> timestamp >>>>>> that won't cause it to get dropped and won't potentially lurch the >>>>>> streamtime far into >>>>>> the future. This would be similar to what we do in the >>>>> TimestampExtractor) >>>>>> On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org> >>>>> wrote: >>>>>>> Oh, I guess one other thing I should have mentioned is that I’ve >>>>> recently >>>>>>> discovered that in cases where the context is undefined, we >>> currently >>>>> just >>>>>>> fill in dummy values for the context. So there’s a good chance that >>>>> real >>>>>>> applications in use are depending on undefined context without even >>>>>>> realizing it. What I’m hoping to do is just make the situation >>>>> explicit and >>>>>>> get rid of the dummy values. >>>>>>> >>>>>>> Thanks, >>>>>>> John >>>>>>> >>>>>>> On Tue, Sep 29, 2020, at 20:01, John Roesler wrote: >>>>>>>> Thanks for the review, Paul! >>>>>>>> >>>>>>>> I had read some of that debate before. There seems to be some >>>>> subtext >>>>>>>> there, because they advise against using Optional in cases like >>>>> this, >>>>>>>> but there doesn’t seem to be a specific reason why it’s >>>>> inappropriate. >>>>>>>> I got the impression they were just afraid that people would go >>>>>>>> overboard and make everything Optional. >>>>>>>> >>>>>>>> I could also make two methods, but it seemed like it might be an >>>>>>>> unfortunate way to handle the issue, since Processor is just >>> about a >>>>>>>> Function as-is, but the two-method approach would require people >>> to >>>>>>>> implement both methods. >>>>>>>> >>>>>>>> To your question, this is something that’s only recently became >>>>> clear >>>>>>>> to me. Imagine you have a parent processor that calls forward both >>>>> from >>>>>>>> process and a punctuator. The child will have process() invoked in >>>>> both >>>>>>>> cases, and won’t be able to distinguish them. However, the record >>>>>>>> metadata is only defined when the parent forwards while >>> processing a >>>>>>>> real record, not when it calls forward from the punctuator. >>>>>>>> >>>>>>>> This is why I wanted to make the metadata Optional, to advertise >>>>> that >>>>>>>> the metadata might be undefined if any ancestor processor ever >>> calls >>>>>>>> forward from a punctuator. We could remove the Optional and >>> instead >>>>>>>> just document that the argument might be null. >>>>>>>> >>>>>>>> With that context in place, what’s your take? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> John >>>>>>>> >>>>>>>> On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote: >>>>>>>>> Looks pretty good to me, though the Processor#process(Record, >>>>>>>>> Optional<RecordMetadata>) signature caught my eye. There's some >>>>>>> debate >>>>>>>>> ( >>>>>>>>> >>> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments >>>>>>> ) >>>>>>>>> about whether to use Optionals in arguments, and while that's a >>>>> bit of >>>>>>> a >>>>>>>>> religious debate in the abstract, it did make me wonder whether >>> it >>>>>>> makes >>>>>>>>> sense in this specific case. When is it actually not present? >>> I >>>>> was >>>>>>>>> under >>>>>>>>> the impression that we should always have access to it in >>>>> process(), >>>>>>> and >>>>>>>>> that the concern about metadata being undefined was about having >>>>>>> access >>>>>>>>> to >>>>>>>>> record metadata in the ProcessorContext held for use inside a >>>>>>>>> Punctuator. >>>>>>>>> >>>>>>>>> If that's not the case and it is truly optional in process(), is >>>>> there >>>>>>> an >>>>>>>>> opportunity for an alternate interface for the cases when we >>>>> don't get >>>>>>> it, >>>>>>>>> rather than force the branching on implementers of the >>> interface? >>>>>>>>> Apologies if I've missed something, I took a look at the PR and >>> I >>>>>>> didn't >>>>>>>>> see any spots where I thought it would be empty. Perhaps an >>>>> example >>>>>>> of a >>>>>>>>> Punctuator using (and not using) the new API would clear things >>>>> up. >>>>>>>>> Best, >>>>>>>>> Paul >>>>>>>>> >>>>>>>>> On Tue, Sep 29, 2020 at 4:10 PM John Roesler < >>> vvcep...@apache.org >>>>>>> wrote: >>>>>>>>>> Hello again, all, >>>>>>>>>> >>>>>>>>>> Thanks for the latest round of discussion. I've taken the >>>>>>>>>> recent feedback and come up with an updated KIP that seems >>>>>>>>>> actually quite a bit nicer than the prior proposal. >>>>>>>>>> >>>>>>>>>> The specific diff on the KIP is here: >>>>>>>>>> >>>>>>>>>> >>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14 >>>>>>>>>> These changes are implemented in this POC PR: >>>>>>>>>> https://github.com/apache/kafka/pull/9346 >>>>>>>>>> >>>>>>>>>> The basic idea is that, building on the recent conversaion, >>>>>>>>>> we would transition away from the current API where we get >>>>>>>>>> only key/value in the process() method and other "data" >>>>>>>>>> comes in the ProcessorContext along with the "metadata". >>>>>>>>>> >>>>>>>>>> Instead, we formalize what is "data" and what is "metadata", >>>>>>>>>> and pass it all in to the process method: >>>>>>>>>> Processor#process(Record, Optional<RecordMetadata>) >>>>>>>>>> >>>>>>>>>> Also, you forward the whole data class instead of mutating >>>>>>>>>> the ProcessorContext fields and also calling forward: >>>>>>>>>> ProcessorContext#forward(Record) >>>>>>>>>> >>>>>>>>>> The Record class itself ships with methods like >>>>>>>>>> record#withValue(NewV newValue) >>>>>>>>>> that make a shallow copy of the input Record, enabling >>>>>>>>>> Processors to safely handle the record without polluting the >>>>>>>>>> context of their parents and siblings. >>>>>>>>>> >>>>>>>>>> This proposal has a number of key benefits: >>>>>>>>>> * As we've discovered in KAFKA-9584, it's unsafe to mutate >>>>>>>>>> the Headers via the ProcessorContext. This proposal offers a >>>>>>>>>> way to safely forward changes only to downstream processors. >>>>>>>>>> * The new API has symmetry (each processor's input is the >>>>>>>>>> output of its parent processor) >>>>>>>>>> * The API makes clear that the record metadata isn't always >>>>>>>>>> defined (for example, in a punctuation, there is no current >>>>>>>>>> topic/partition/offset) >>>>>>>>>> * The API enables punctuators to forward well defined >>>>>>>>>> headers downstream, which is currently not possible. >>>>>>>>>> >>>>>>>>>> Unless their are objections, I'll go ahead and re-finalize >>>>>>>>>> this KIP and update that PR to a mergeable state. >>>>>>>>>> >>>>>>>>>> Thanks, all, >>>>>>>>>> -John >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote: >>>>>>>>>>> Interesting proposal. However, I am not totally convinced, >>>>> because >>>>>>> I see >>>>>>>>>>> a fundamental difference between "data" and "metadata". >>>>>>>>>>> >>>>>>>>>>> Topic/partition/offset are "metadata" in the strong sense >>> and >>>>> they >>>>>>> are >>>>>>>>>>> immutable. >>>>>>>>>>> >>>>>>>>>>> On the other hand there is "primary" data like key and >>> value, >>>>> as >>>>>>> well as >>>>>>>>>>> "secondary" data like timestamp and headers. The issue seems >>>>> that >>>>>>> we >>>>>>>>>>> treat "secondary data" more like metadata atm? >>>>>>>>>>> >>>>>>>>>>> Thus, promoting timestamp and headers into a first class >>>>> citizen >>>>>>> roll >>>>>>>>>>> make sense to me (my original proposal about `RecordContext` >>>>> would >>>>>>> still >>>>>>>>>>> fall short with this regard). However, putting both (data >>> and >>>>>>> metadata) >>>>>>>>>>> into a `Record` abstraction might go too far? >>>>>>>>>>> >>>>>>>>>>> I am also a little bit concerned about `Record.copy()` >>>>> because it >>>>>>> might >>>>>>>>>>> be a trap: Users might assume it does a full deep copy of >>> the >>>>>>> record, >>>>>>>>>>> however, it would not. It would only create a new `Record` >>>>> object >>>>>>> as >>>>>>>>>>> wrapper that points to the same key/value/header objects as >>>>> the >>>>>>> input >>>>>>>>>>> record. >>>>>>>>>>> >>>>>>>>>>> With the current `context.forward(key, value)` we don't have >>>>> this >>>>>>> "deep >>>>>>>>>>> copy" issue -- it's pretty clear what is happening. >>>>>>>>>>> >>>>>>>>>>> Instead of `To.all().withTimestamp()` we could also add >>>>>>>>>>> `context.forward(key, value, timestamp)` etc (just wondering >>>>> about >>>>>>> the >>>>>>>>>>> exposition in overload)? >>>>>>>>>>> >>>>>>>>>>> Also, `Record.withValue` etc sounds odd? Should a record not >>>>> be >>>>>>>>>>> immutable? So, we could have something like >>>>>>>>>>> >>>>>>>>>>> >>> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`. >>>>>>>>>>> But it looks rather verbose? >>>>>>>>>>> >>>>>>>>>>> The other question is of course, to what extend to we want >>> to >>>>> keep >>>>>>> the >>>>>>>>>>> distinction between "primary" and "secondary" data? To me, >>>>> it's a >>>>>>>>>>> question of easy of use? >>>>>>>>>>> >>>>>>>>>>> Just putting all this out to move the discussion forward. >>>>> Don't >>>>>>> have a >>>>>>>>>>> concrete proposal atm. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 9/14/20 9:24 AM, John Roesler wrote: >>>>>>>>>>>> Thanks for this thought, Matthias! >>>>>>>>>>>> >>>>>>>>>>>> To be honest, it's bugged me quite a bit that _all_ the >>>>>>>>>>>> record information hasn't been an argument to `process`. I >>>>>>>>>>>> suppose I was trying to be conservative in this proposal, >>>>>>>>>>>> but then again, if we're adding new Processor and >>>>>>>>>>>> ProcessorContext interfaces, then this is the time to make >>>>>>>>>>>> such a change. >>>>>>>>>>>> >>>>>>>>>>>> To be unambiguous, I think this is what we're talking >>> about: >>>>>>>>>>>> ProcessorContext: >>>>>>>>>>>> * applicationId >>>>>>>>>>>> * taskId >>>>>>>>>>>> * appConfigs >>>>>>>>>>>> * appConfigsWithPrefix >>>>>>>>>>>> * keySerde >>>>>>>>>>>> * valueSerde >>>>>>>>>>>> * stateDir >>>>>>>>>>>> * metrics >>>>>>>>>>>> * schedule >>>>>>>>>>>> * commit >>>>>>>>>>>> * forward >>>>>>>>>>>> >>>>>>>>>>>> StateStoreContext: >>>>>>>>>>>> * applicationId >>>>>>>>>>>> * taskId >>>>>>>>>>>> * appConfigs >>>>>>>>>>>> * appConfigsWithPrefix >>>>>>>>>>>> * keySerde >>>>>>>>>>>> * valueSerde >>>>>>>>>>>> * stateDir >>>>>>>>>>>> * metrics >>>>>>>>>>>> * register >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> RecordContext >>>>>>>>>>>> * topic >>>>>>>>>>>> * partition >>>>>>>>>>>> * offset >>>>>>>>>>>> * timestamp >>>>>>>>>>>> * headers >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Your proposal sounds good to me as-is. Just to cover the >>>>>>>>>>>> bases, though, I'm wondering if we should push the idea >>> just >>>>>>>>>>>> a little farther. Instead of decomposing >>> key,value,context, >>>>>>>>>>>> we could just keep them all in one object, like this: >>>>>>>>>>>> >>>>>>>>>>>> Record: >>>>>>>>>>>> * key >>>>>>>>>>>> * value >>>>>>>>>>>> * topic >>>>>>>>>>>> * partition >>>>>>>>>>>> * offset >>>>>>>>>>>> * timestamp >>>>>>>>>>>> * headers >>>>>>>>>>>> >>>>>>>>>>>> Then, we could have: >>>>>>>>>>>> Processor#process(Record) >>>>>>>>>>>> ProcessorContext#forward(Record, To) >>>>>>>>>>>> >>>>>>>>>>>> Viewed from this perspective, a record has three >>> properties >>>>>>>>>>>> that people may specify in their processors: key, value, >>> and >>>>>>>>>>>> timestamp. >>>>>>>>>>>> >>>>>>>>>>>> We could deprecate `To#withTimestamp` and enable people to >>>>>>>>>>>> specify the timestamp along with the key and value when >>> they >>>>>>>>>>>> forward a record. >>>>>>>>>>>> >>>>>>>>>>>> E.g., >>>>>>>>>>>> RecordBuilder toForward = RecordBuilder.copy(record) >>>>>>>>>>>> toForward.withKey(newKey) >>>>>>>>>>>> toForward.withValue(newValue) >>>>>>>>>>>> toForward.withTimestamp(newTimestamp) >>>>>>>>>>>> Record newRecord = toForward.build() >>>>>>>>>>>> context.forward(newRecord, To.child("child1")) >>>>>>>>>>>> >>>>>>>>>>>> Or, the more compact common case: >>>>>>>>>>>> current: >>>>>>>>>>>> context.forward(key, "newValue") >>>>>>>>>>>> proposed: >>>>>>>>>>>> >>> context.forward(copy(record).withValue("newValue").build()) >>>>>>>>>>>> >>>>>>>>>>>> It's slightly more verbose, but also more extensible. This >>>>>>>>>>>> would give us a clean path to add header support in PAPI >>> as >>>>>>>>>>>> well, simply by adding `withHeaders` in RecordBuilder. >>>>>>>>>>>> >>>>>>>>>>>> It's also more symmetrical, since the recipient of >>> `forward` >>>>>>>>>>>> would just get the sent `Record`. Whereas today, the >>> sender >>>>>>>>>>>> puts the timestamp in `To`, but the recipient gets in in >>> its >>>>>>>>>>>> own `ProcessorContext`. >>>>>>>>>>>> >>>>>>>>>>>> WDYT? >>>>>>>>>>>> -John >>>>>>>>>>>> >>>>>>>>>>>> On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote: >>>>>>>>>>>>> I think separating the different contexts make sense. >>>>>>>>>>>>> >>>>>>>>>>>>> In fact, we could even go one step further and remove >>> the >>>>>>> record >>>>>>>>>> context >>>>>>>>>>>>> from the processor context completely and we add a third >>>>>>> parameter to >>>>>>>>>>>>> `process(key, value, recordContext)`. This would make it >>>>> clear >>>>>>> that >>>>>>>>>> the >>>>>>>>>>>>> context is for the input record only and it's not >>>>> possible to >>>>>>> pass >>>>>>>>>> it to >>>>>>>>>>>>> a `punctuate` callback. >>>>>>>>>>>>> >>>>>>>>>>>>> For the stores and changelogging: I think there are two >>>>> cases. >>>>>>> (1) >>>>>>>>>> You >>>>>>>>>>>>> use a plain key-value store. For this case, it seems you >>>>> do >>>>>>> not care >>>>>>>>>>>>> about the timestamp and thus does not care what >>> timestamp >>>>> is >>>>>>> set in >>>>>>>>>> the >>>>>>>>>>>>> changelog records. (We can set anything we want, as it's >>>>> not >>>>>>>>>> relevant at >>>>>>>>>>>>> all -- the timestamp is ignored on read anyway.) (2) The >>>>> other >>>>>>> case >>>>>>>>>> is, >>>>>>>>>>>>> that one does care about timestamps, and for this case >>>>> should >>>>>>> use >>>>>>>>>>>>> TimestampedKeyValueStore. The passed timestamp will be >>>>> set on >>>>>>> the >>>>>>>>>>>>> changelog records for this case. >>>>>>>>>>>>> >>>>>>>>>>>>> Thus, for both cases, accessing the record context does >>>>> not >>>>>>> seems to >>>>>>>>>> be >>>>>>>>>>>>> a requirement. And providing access to the processor >>>>> context >>>>>>> to, eg., >>>>>>>>>>>>> `forward()` or similar seems safe. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 9/10/20 7:25 PM, John Roesler wrote: >>>>>>>>>>>>>> Thanks for the reply, Paul! >>>>>>>>>>>>>> >>>>>>>>>>>>>> I certainly intend to make sure that the changelogging >>>>> layer >>>>>>>>>>>>>> continues to work the way it does now, by hook or by >>>>> crook. >>>>>>>>>>>>>> I think the easiest path for me is to just "cheat" and >>>>> get >>>>>>>>>>>>>> the real ProcessorContext into the ChangeLoggingStore >>>>>>>>>>>>>> implementation somehow. I'll tag you on the PR when I >>>>> create >>>>>>>>>>>>>> it, so you have an opportunity to express a preference >>>>> about >>>>>>>>>>>>>> the implementation choice, and maybe even compile/test >>>>>>>>>>>>>> against it to make sure your stuff still works. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding this: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> we have an interest in making a state store with a >>>>> richer >>>>>>>>>>>>>>> way of querying its data (like perhaps getting all >>>>> values >>>>>>>>>>>>>>> associated with a secondary key), while still >>>>> ultimately >>>>>>>>>>>>>>> writing to the changelog topic for later >>> restoration. >>>>>>>>>>>>>> This is very intriguing to me. On the side, I've been >>>>>>>>>>>>>> preparing a couple of ideas related to this topic. I >>>>> don't >>>>>>>>>>>>>> think I have a coherent enough thought to even express >>>>> it in >>>>>>>>>>>>>> a Jira right now, but when I do, I'll tag you on it >>>>> also to >>>>>>>>>>>>>> see what you think. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Whenever you're ready to share the usability >>> improvement >>>>>>>>>>>>>> ideas, I'm very interested to see what you've come up >>>>> with. >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> -John >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote: >>>>>>>>>>>>>>>> when you use a HashMap or RocksDB or other "state >>>>>>> stores", you >>>>>>>>>> don't >>>>>>>>>>>>>>>> expect them to automatically know extra stuff >>> about >>>>> the >>>>>>> record >>>>>>>>>> you're >>>>>>>>>>>>>>>> storing. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So, I don't think there is any reason we *can't* >>>>> retain the >>>>>>>>>> record context >>>>>>>>>>>>>>>> in the StateStoreContext, and if any users came >>>>> along >>>>>>> with a >>>>>>>>>> clear use case >>>>>>>>>>>>>>>> I'd find that convincing. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I agree with the principle of being conservative >>> with >>>>> the >>>>>>>>>> StateStoreContext >>>>>>>>>>>>>>> API. Regarding user expectations or a clear use >>>>> case, the >>>>>>> only >>>>>>>>>>>>>>> counterpoint I would offer is that we sort of have >>>>> that >>>>>>> use case >>>>>>>>>> already, >>>>>>>>>>>>>>> which is the example I gave of the change logging >>>>> store >>>>>>> using the >>>>>>>>>>>>>>> timestamp. I am curious if this functionality will >>> be >>>>>>> retained >>>>>>>>>> when using >>>>>>>>>>>>>>> built in state stores, or will a low-level processor >>>>> get a >>>>>>>>>> KeyValueStore >>>>>>>>>>>>>>> that no longer writes to the changelog topic with >>> the >>>>>>> record's >>>>>>>>>> timestamp. >>>>>>>>>>>>>>> While I personally don't care much about that >>>>> functionality >>>>>>>>>> specifically, I >>>>>>>>>>>>>>> have a general desire for custom state stores to >>>>> easily do >>>>>>> the >>>>>>>>>> things that >>>>>>>>>>>>>>> built in state stores do. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It genuinely did not occur to me that users might be >>>>>>> looking up >>>>>>>>>> and/or >>>>>>>>>>>>>>>> updating records of other keys from within a >>>>> Processor. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm glad you said this Sophie, because it gives me >>> an >>>>>>>>>> opportunity to say >>>>>>>>>>>>>>> that this is actually a *huge* use case for my team. >>>>> The >>>>>>> state >>>>>>>>>> store >>>>>>>>>>>>>>> usability improvements I was referring to in my >>>>> previous >>>>>>> message >>>>>>>>>> were about >>>>>>>>>>>>>>> enabling the user to write custom stores while still >>>>> easily >>>>>>>>>> hooking into >>>>>>>>>>>>>>> the ability to write to a changelog topic. I think >>>>> that is >>>>>>>>>> technically >>>>>>>>>>>>>>> possible now, but I don't think it's trivial. >>>>>>> Specifically, we >>>>>>>>>> have an >>>>>>>>>>>>>>> interest in making a state store with a richer way >>> of >>>>>>> querying >>>>>>>>>> its data >>>>>>>>>>>>>>> (like perhaps getting all values associated with a >>>>>>> secondary >>>>>>>>>> key), while >>>>>>>>>>>>>>> still ultimately writing to the changelog topic for >>>>> later >>>>>>>>>> restoration. >>>>>>>>>>>>>>> We recognize that this use case throws away some of >>>>> what >>>>>>> kafka >>>>>>>>>> streams >>>>>>>>>>>>>>> (especially the DSL) is good at - easy >>>>> parallelizability by >>>>>>>>>> partitioning >>>>>>>>>>>>>>> all processing by key - and that our business logic >>>>> would >>>>>>>>>> completely fall >>>>>>>>>>>>>>> apart if we were consuming from multi-partition >>>>> topics with >>>>>>>>>> multiple >>>>>>>>>>>>>>> consumers. But we have found that using the low >>> level >>>>>>> processor >>>>>>>>>> API is >>>>>>>>>>>>>>> good for the very simple stream processing >>> primitives >>>>> it >>>>>>>>>> provides: handling >>>>>>>>>>>>>>> the plumbing of consuming from multiple kafka topics >>>>> and >>>>>>>>>> potentially >>>>>>>>>>>>>>> updating persistent local state in a reliable way. >>>>> That in >>>>>>>>>> itself has >>>>>>>>>>>>>>> proven to be a worthwhile programming model. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Since I got off track a bit, let me summarize: I >>> don't >>>>>>>>>> particularly care >>>>>>>>>>>>>>> about the record context being available to state >>>>> store >>>>>>>>>> implementations, >>>>>>>>>>>>>>> and I think this KIP is headed in the right >>> direction >>>>> in >>>>>>> that >>>>>>>>>> regard. But >>>>>>>>>>>>>>> more generally, I wanted to express the importance >>> of >>>>>>>>>> maintaining a >>>>>>>>>>>>>>> powerful and flexible StateStore interface. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman >>> < >>>>>>>>>> sop...@confluent.io> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Aha, I did misinterpret the example in your >>> previous >>>>>>> response >>>>>>>>>> regarding the >>>>>>>>>>>>>>>> range query after all. I thought you just meant a >>>>>>> time-range >>>>>>>>>> query inside a >>>>>>>>>>>>>>>> punctuator. It genuinely did not occur to me that >>>>> users >>>>>>> might >>>>>>>>>> be looking up >>>>>>>>>>>>>>>> and/or updating records of other keys from within >>> a >>>>>>> Processor. >>>>>>>>>> Sorry for >>>>>>>>>>>>>>>> being closed minded >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I won't drag out this discussion any further by >>>>> asking >>>>>>> whether >>>>>>>>>> that might >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>> a valid use case or just a lurking bug in itself >>> :) >>>>>>>>>>>>>>>> Thanks for humoring me. The current proposal for >>>>> KIP-478 >>>>>>>>>> sounds good to me >>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:43 PM John Roesler < >>>>>>>>>> vvcep...@apache.org> wrote: >>>>>>>>>>>>>>>>> Ah, thanks Sophie, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm sorry for misinterpreting your resonse. Yes, >>>>> we >>>>>>>>>>>>>>>>> absolutely can and should clear the context >>> before >>>>>>>>>>>>>>>>> punctuating. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> My secondary concern is maybe more far-fetched. >>> I >>>>> was >>>>>>>>>>>>>>>>> thinking that inside process(key,value), a >>>>> Processor >>>>>>> might >>>>>>>>>>>>>>>>> do a get/put of a _different_ key. Consider, for >>>>>>> example, >>>>>>>>>>>>>>>>> the way that Suppress processors work. When they >>>>> get a >>>>>>>>>>>>>>>>> record, they add it to the store and then do a >>>>> range >>>>>>> scan >>>>>>>>>>>>>>>>> and possibly forward a _different_ record. Of >>>>> course, >>>>>>> this >>>>>>>>>>>>>>>>> is an operation that is deeply coupled to the >>>>>>> internals, and >>>>>>>>>>>>>>>>> the Suppress processor accordingly actually does >>>>> get >>>>>>> access >>>>>>>>>>>>>>>>> to the internal context so that it can set the >>>>> context >>>>>>>>>>>>>>>>> before forwarding. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Still, it seems like I've had a handful of >>>>>>> conversations >>>>>>>>>>>>>>>>> with people over the years in which they tell me >>>>> they >>>>>>> are >>>>>>>>>>>>>>>>> using state stores in a way that transcends the >>>>> "get >>>>>>> and put >>>>>>>>>>>>>>>>> the currently processing record" access >>> pattern. I >>>>>>> doubt >>>>>>>>>>>>>>>>> that those folks would even have considered the >>>>>>> possiblity >>>>>>>>>>>>>>>>> that the currently processing record's _context_ >>>>> could >>>>>>>>>>>>>>>>> pollute their state store operations, as I >>> myself >>>>>>> never gave >>>>>>>>>>>>>>>>> it a second thought until the current >>> conversation >>>>>>> began. In >>>>>>>>>>>>>>>>> cases like that, we have actually set a trap for >>>>> these >>>>>>>>>>>>>>>>> people, and it seems better to dismantle the >>> trap. >>>>>>>>>>>>>>>>> As you noted, really the only people who would >>> be >>>>>>> negatively >>>>>>>>>>>>>>>>> impacted are people who implement their own >>> state >>>>>>> stores. >>>>>>>>>>>>>>>>> These folks will get the deprecation warning and >>>>> try to >>>>>>>>>>>>>>>>> adapt their stores to the new interface. If they >>>>> needed >>>>>>>>>>>>>>>>> access to the record context, they would find >>>>> it's now >>>>>>>>>>>>>>>>> missing. They'd ask us about it, and we'd have >>> the >>>>>>> ability >>>>>>>>>>>>>>>>> to explain the lurking bug that they have had in >>>>> their >>>>>>>>>>>>>>>>> stores all along, as well as the new recommended >>>>>>> pattern >>>>>>>>>>>>>>>>> (just pass everything you need in the value). If >>>>> that's >>>>>>>>>>>>>>>>> unsatisfying, _then_ we should consider amending >>>>> the >>>>>>> API. >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 15:21 -0700, Sophie >>>>> Blee-Goldman >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the >>>>> processor >>>>>>> would >>>>>>>>>> null >>>>>>>>>>>>>>>>>>> out the record context...", this is not >>>>> possible, >>>>>>> since >>>>>>>>>> the >>>>>>>>>>>>>>>>>>> processor doesn't have write access to the >>>>>>> context. We >>>>>>>>>> could >>>>>>>>>>>>>>>>>>> add it, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Sorry, this was poorly phrased, I definitely >>>>> did not >>>>>>> mean >>>>>>>>>> to imply that >>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> should make the context modifiable by the >>>>> Processors >>>>>>>>>> themselves. I >>>>>>>>>>>>>>>> meant >>>>>>>>>>>>>>>>>> this should be handled by the internal >>>>> processing >>>>>>>>>> framework that deals >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>> passing records from one Processor to the >>> next, >>>>>>> setting >>>>>>>>>> the record >>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>> when a new record is picked up, invoking the >>>>>>> punctuators, >>>>>>>>>> etc. I >>>>>>>>>>>>>>>> believe >>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> all currently happens in the StreamTask? It >>>>> already >>>>>>> can >>>>>>>>>> and does >>>>>>>>>>>>>>>>> overwrite >>>>>>>>>>>>>>>>>> the record context as new records are >>>>> processed, and >>>>>>> is >>>>>>>>>> also >>>>>>>>>>>>>>>> responsible >>>>>>>>>>>>>>>>>> for calling the punctuators, so it doesn't >>> seem >>>>> like >>>>>>> a >>>>>>>>>> huge leap to >>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>>> "null out the current record before >>> punctuating" >>>>>>>>>>>>>>>>>> To clarify, I was never advocating or even >>>>>>> considering to >>>>>>>>>> give the >>>>>>>>>>>>>>>>>> Processors >>>>>>>>>>>>>>>>>> write access to the record context. Sorry if >>> my >>>>> last >>>>>>>>>> message (or all of >>>>>>>>>>>>>>>>>> them) >>>>>>>>>>>>>>>>>> was misleading. I just wanted to point out >>> that >>>>> the >>>>>>>>>> punctuator concern >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> orthogonal to the question of whether we >>> should >>>>>>> include >>>>>>>>>> the record >>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>> in the StateStoreContext. It's definitely a >>> real >>>>>>> problem, >>>>>>>>>> but it's a >>>>>>>>>>>>>>>>>> problem >>>>>>>>>>>>>>>>>> that exists at the Processor level and not >>> just >>>>> the >>>>>>>>>> StateStore. >>>>>>>>>>>>>>>>>> So, I don't think there is any reason we >>> *can't* >>>>>>> retain >>>>>>>>>> the record >>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>> StateStoreContext, and if any users came along >>>>> with a >>>>>>>>>> clear use case >>>>>>>>>>>>>>>> I'd >>>>>>>>>>>>>>>>>> find >>>>>>>>>>>>>>>>>> that convincing. In the absence of any >>>>> examples, the >>>>>>>>>> conservative >>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>> sounds good to me. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If it turns out that someone did need the >>> record >>>>>>> context >>>>>>>>>> in their >>>>>>>>>>>>>>>> custom >>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>> store, I'm sure they'll submit a politely >>>>> worded bug >>>>>>>>>> report alerting us >>>>>>>>>>>>>>>>>> that we >>>>>>>>>>>>>>>>>> broke their application. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:05 PM John Roesler < >>>>>>>>>> vvcep...@apache.org> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> Thanks, Sophie, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes, now that you point it out, I can see >>>>> that the >>>>>>> record >>>>>>>>>>>>>>>>>>> context itself should be nulled out by >>> Streams >>>>>>> before >>>>>>>>>>>>>>>>>>> invoking punctuators. From that perspective, >>>>> we >>>>>>> don't >>>>>>>>>> need >>>>>>>>>>>>>>>>>>> to think about the second-order problem of >>>>> what's >>>>>>> in the >>>>>>>>>>>>>>>>>>> context for the state store when called >>> from a >>>>>>>>>> punctuator. >>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the >>>>> processor >>>>>>> would >>>>>>>>>> null >>>>>>>>>>>>>>>>>>> out the record context...", this is not >>>>> possible, >>>>>>> since >>>>>>>>>> the >>>>>>>>>>>>>>>>>>> processor doesn't have write access to the >>>>>>> context. We >>>>>>>>>> could >>>>>>>>>>>>>>>>>>> add it, but then all kinds of strange >>> effects >>>>>>> would ensue >>>>>>>>>>>>>>>>>>> when downstream processors execute but the >>>>> context >>>>>>> is >>>>>>>>>> empty, >>>>>>>>>>>>>>>>>>> etc. Better to just let the framework manage >>>>> the >>>>>>> record >>>>>>>>>>>>>>>>>>> context and keep it read-only for >>> Processors. >>>>>>>>>>>>>>>>>>> Reading between the lines of your last >>> reply, >>>>> it >>>>>>> sounds >>>>>>>>>> that >>>>>>>>>>>>>>>>>>> the disconnect may just have been a mutual >>>>>>>>>> misunderstanding >>>>>>>>>>>>>>>>>>> about whether or not Processors currently >>> have >>>>>>> access to >>>>>>>>>> set >>>>>>>>>>>>>>>>>>> the record context. Since they do not, if we >>>>>>> wanted to >>>>>>>>>> add >>>>>>>>>>>>>>>>>>> the record context to StateStoreContext in a >>>>>>> well-defined >>>>>>>>>>>>>>>>>>> way, we'd also have to add the ability for >>>>>>> Processors to >>>>>>>>>>>>>>>>>>> manipulate it. But then, we're just >>> creating a >>>>>>>>>> side-channel >>>>>>>>>>>>>>>>>>> for Processors to pass some information in >>>>>>> arguments to >>>>>>>>>>>>>>>>>>> "put()" and other information implicitly >>>>> through >>>>>>> the >>>>>>>>>>>>>>>>>>> context. It seems better just to go for a >>>>> single >>>>>>> channel >>>>>>>>>> for >>>>>>>>>>>>>>>>>>> now. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It sounds like you're basically in favor of >>>>> the >>>>>>>>>> conservative >>>>>>>>>>>>>>>>>>> approach, and you just wanted to understand >>>>> the >>>>>>> blockers >>>>>>>>>>>>>>>>>>> that I implied. Does my clarification make >>>>> sense? >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 10:54 -0700, Sophie >>>>>>> Blee-Goldman >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> I was just thinking that the processor >>> would >>>>>>> null out >>>>>>>>>> the record >>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>> after it >>>>>>>>>>>>>>>>>>>> finished processing the record, so I'm not >>>>> sure I >>>>>>>>>> follow why this >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> possible? AFAIK we never call a punctuator >>>>> in the >>>>>>>>>> middle of >>>>>>>>>>>>>>>>> processing a >>>>>>>>>>>>>>>>>>>> record through the topology, and even if >>> we >>>>> did, >>>>>>> we >>>>>>>>>> still know when >>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>> to be called and could set it to null >>>>> beforehand. >>>>>>>>>>>>>>>>>>>> I'm not trying to advocate for it here, >>> I'm >>>>> in >>>>>>>>>> agreement that >>>>>>>>>>>>>>>>> anything >>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>> to access within the store can and should >>> be >>>>>>> accessed >>>>>>>>>> within the >>>>>>>>>>>>>>>>> calling >>>>>>>>>>>>>>>>>>>> Processor/Punctuator before reaching the >>>>> store. >>>>>>> The >>>>>>>>>> "we can always >>>>>>>>>>>>>>>>> add it >>>>>>>>>>>>>>>>>>>> later if necessary" argument is also >>> pretty >>>>>>>>>> convincing. Just trying >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>>>>> why this wouldn't be possible. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> FWIW, the question of "what is the current >>>>>>> record in >>>>>>>>>> the context >>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>> Punctuator" >>>>>>>>>>>>>>>>>>>> exists independently of whether we want to >>>>> add >>>>>>> this to >>>>>>>>>> the >>>>>>>>>>>>>>>>>>> StateStoreContext >>>>>>>>>>>>>>>>>>>> or not. The full ProcessorContext, >>>>> including the >>>>>>>>>> current record >>>>>>>>>>>>>>>>> context, >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> already available within a Punctuator, so >>>>>>> removing the >>>>>>>>>> current >>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>> from the StateStoreContext does not solve >>>>> the >>>>>>> problem. >>>>>>>>>> Users can -- >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>> (see KAFKA-9584 < >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9584 >>>>>>>>>>>>>>>>> ;;) >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> hit >>>>>>>>>>>>>>>>>>>> such subtle bugs without ever invoking a >>>>>>> StateStore >>>>>>>>>>>>>>>>>>>> from their punctuator. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Again, I think I do agree that we should >>>>> leave >>>>>>> the >>>>>>>>>> current record >>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>> off of >>>>>>>>>>>>>>>>>>>> the StateStoreContext, but I don't think >>> the >>>>>>>>>> Punctuator argument >>>>>>>>>>>>>>>>> against >>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> very convincing. It sounds to me like we >>>>> need to >>>>>>>>>> disallow access to >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>> record context from within the Punctuator, >>>>>>> independent >>>>>>>>>> of anything >>>>>>>>>>>>>>>>> to do >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> state stores >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 7:12 AM John >>>>> Roesler < >>>>>>>>>> vvcep...@apache.org> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> Thanks for the thoughts, Sophie. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I agree that the extra information could >>>>> be >>>>>>> useful. >>>>>>>>>> My only >>>>>>>>>>>>>>>>> concern is >>>>>>>>>>>>>>>>>>>>> that it doesn’t seem like we can >>> actually >>>>>>> supply >>>>>>>>>> that extra >>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>> correctly. So, then we have a situation >>>>> where >>>>>>> the >>>>>>>>>> system offers >>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>> calls that are only correct in a narrow >>>>> range >>>>>>> of use >>>>>>>>>> cases. >>>>>>>>>>>>>>>>> Outside of >>>>>>>>>>>>>>>>>>>>> those use cases, you get incorrect >>>>> behavior. >>>>>>>>>>>>>>>>>>>>> If it were possible to null out the >>>>> context >>>>>>> before >>>>>>>>>> you put a >>>>>>>>>>>>>>>>> document >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> which the context doesn’t apply, then >>> the >>>>>>> concern >>>>>>>>>> would be >>>>>>>>>>>>>>>>> mitigated. >>>>>>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>>>>>> it would still be pretty weird from the >>>>>>> perspective >>>>>>>>>> of the store >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> sometimes the context is populated and >>>>> other >>>>>>> times, >>>>>>>>>> it’s null. >>>>>>>>>>>>>>>>>>>>> But that seems moot, since it doesn’t >>> seem >>>>>>> possible >>>>>>>>>> to null out >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> context. Only the Processor could know >>>>> whether >>>>>>> it’s >>>>>>>>>> about to put >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> document >>>>>>>>>>>>>>>>>>>>> different from the context or not. And >>> it >>>>>>> would be >>>>>>>>>> inappropriate >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> offer a >>>>>>>>>>>>>>>>>>>>> public ProcessorContext api to manage >>> the >>>>>>> record >>>>>>>>>> context. >>>>>>>>>>>>>>>>>>>>> Ultimately, it still seems like if you >>>>> want to >>>>>>> store >>>>>>>>>> headers, you >>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>> store them explicitly, right? That >>>>> doesn’t seem >>>>>>>>>> onerous to me, >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> kind >>>>>>>>>>>>>>>>>>>>> of seems better than relying on >>> undefined >>>>> or >>>>>>>>>> asymmetrical >>>>>>>>>>>>>>>> behavior >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> store itself. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Anyway, I’m not saying that we couldn’t >>>>> solve >>>>>>> these >>>>>>>>>> problems. >>>>>>>>>>>>>>>> Just >>>>>>>>>>>>>>>>>>> that it >>>>>>>>>>>>>>>>>>>>> seems a little that we can be >>>>> conservative and >>>>>>> avoid >>>>>>>>>> them for >>>>>>>>>>>>>>>> now. >>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> turns out we really need to solve them, >>>>> we can >>>>>>>>>> always do it >>>>>>>>>>>>>>>> later. >>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>> John >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020, at 22:46, Sophie >>>>>>> Blee-Goldman >>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a >>>>>>> punctuator, or >>>>>>>>>> do a >>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one >>> of >>>>>>> those >>>>>>>>>> records with >>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle >>> bug >>>>> on >>>>>>> your >>>>>>>>>> hands. >>>>>>>>>>>>>>>>>>>>>> Can you elaborate on this a bit? I >>> agree >>>>>>> that the >>>>>>>>>> punctuator >>>>>>>>>>>>>>>>> case is >>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>> obvious exemption to the assumption >>> that >>>>>>> store >>>>>>>>>> invocations >>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>> have a corresponding "current record", >>>>> but I >>>>>>> don't >>>>>>>>>> understand >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> second example. Are you envisioning a >>>>>>> scenario >>>>>>>>>> where the >>>>>>>>>>>>>>>> #process >>>>>>>>>>>>>>>>>>>>>> method performs a range query and then >>>>>>> updates >>>>>>>>>> records? Or were >>>>>>>>>>>>>>>>>>>>>> you just giving another example of the >>>>>>> punctuator >>>>>>>>>> case? >>>>>>>>>>>>>>>>>>>>>> I only bring it up because I agree >>> that >>>>> the >>>>>>>>>> current record >>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>> still be useful within the context of >>>>> the >>>>>>> store. >>>>>>>>>> As a non-user >>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>> input >>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> definitely has limited value, but it >>>>> just >>>>>>> isn't >>>>>>>>>> striking me as >>>>>>>>>>>>>>>>>>> obvious >>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> should remove access to the current >>>>> record >>>>>>> context >>>>>>>>>> from the >>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>> stores. >>>>>>>>>>>>>>>>>>>>>> If there is no current record, as in >>> the >>>>>>>>>> punctuator case, we >>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>> set >>>>>>>>>>>>>>>>>>>>>> the record context to null (or >>>>>>> Optional.empty, >>>>>>>>>> etc). >>>>>>>>>>>>>>>>>>>>>> That said, the put() always has to >>> come >>>>> from >>>>>>>>>> somewhere, and >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> somewhere is always going to be >>> either a >>>>>>> Processor >>>>>>>>>> or a >>>>>>>>>>>>>>>>> Punctuator, >>>>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>> of which will still have access to the >>>>> full >>>>>>>>>> context. So >>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>> info >>>>>>>>>>>>>>>>>>>>>> such as >>>>>>>>>>>>>>>>>>>>>> the timestamp can and should probably >>> be >>>>>>> supplied >>>>>>>>>> to the store >>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>>>>> calling put(), rather than looked up >>> by >>>>> the >>>>>>> store. >>>>>>>>>> But I can >>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>>> things being useful, for example the >>>>> current >>>>>>>>>> record's headers. >>>>>>>>>>>>>>>>> Maybe >>>>>>>>>>>>>>>>>>>>> if/when >>>>>>>>>>>>>>>>>>>>>> we add better (or any) support for >>>>> headers in >>>>>>>>>> state stores this >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> less true. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Of course as John has made clear, it's >>>>>>> pretty hard >>>>>>>>>> to judge >>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>>>>> examples >>>>>>>>>>>>>>>>>>>>>> and more insight as to what actually >>>>> goes on >>>>>>>>>> within a custom >>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 8:07 PM John >>>>> Roesler < >>>>>>>>>>>>>>>> vvcep...@apache.org >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> Hi Paul, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> It's good to hear from you! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I'm glad you're in favor of the >>>>> direction. >>>>>>>>>> Especially when >>>>>>>>>>>>>>>>>>>>>>> it comes to public API and usability >>>>>>> concens, I >>>>>>>>>> tend to >>>>>>>>>>>>>>>>>>>>>>> think that "the folks who matter" >>> are >>>>>>> actually >>>>>>>>>> the folks who >>>>>>>>>>>>>>>>>>>>>>> have to use the APIs to accomplish >>>>> real >>>>>>> tasks. >>>>>>>>>> It can be >>>>>>>>>>>>>>>>>>>>>>> hard for me to be sure I'm thinking >>>>>>> clearly from >>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> perspective. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Funny story, I also started down >>> this >>>>> road >>>>>>> a >>>>>>>>>> couple of times >>>>>>>>>>>>>>>>>>>>>>> already and backed them out before >>>>> the KIP >>>>>>>>>> because I was >>>>>>>>>>>>>>>>>>>>>>> afraid of the scope of the proposal. >>>>>>>>>> Unfortunately, needing >>>>>>>>>>>>>>>>>>>>>>> to make a new ProcessorContext kind >>> of >>>>>>> forced my >>>>>>>>>> hand. >>>>>>>>>>>>>>>>>>>>>>> I see you've called me out about the >>>>>>>>>> ChangeLogging stores :) >>>>>>>>>>>>>>>>>>>>>>> In fact, I think these are the >>>>> main/only >>>>>>> reason >>>>>>>>>> that stores >>>>>>>>>>>>>>>>>>>>>>> might really need to invoke >>>>> "forward()". My >>>>>>>>>> secret plan was >>>>>>>>>>>>>>>>>>>>>>> to cheat and either accomplish >>>>>>> change-logging by >>>>>>>>>> a different >>>>>>>>>>>>>>>>>>>>>>> mechanism than implementing the >>> store >>>>>>> interface, >>>>>>>>>> or by just >>>>>>>>>>>>>>>>>>>>>>> breaking encapsulation to sneak the >>>>> "real" >>>>>>>>>> ProcessorContext >>>>>>>>>>>>>>>>>>>>>>> into the ChangeLogging stores. But >>>>> those >>>>>>> are all >>>>>>>>>>>>>>>>>>>>>>> implementation details. I think the >>>>> key >>>>>>> question >>>>>>>>>> is whether >>>>>>>>>>>>>>>>>>>>>>> anyone else has a store >>>>> implementation that >>>>>>>>>> needs to call >>>>>>>>>>>>>>>>>>>>>>> "forward()". It's not what you >>>>> mentioned, >>>>>>> but >>>>>>>>>> since you >>>>>>>>>>>>>>>>>>>>>>> spoke up, I'll just ask: if you have >>>>> a use >>>>>>> case >>>>>>>>>> for calling >>>>>>>>>>>>>>>>>>>>>>> "forward()" in a store, please share >>>>> it. >>>>>>>>>>>>>>>>>>>>>>> Regarding the other record-specific >>>>> context >>>>>>>>>> methods, I think >>>>>>>>>>>>>>>>>>>>>>> you have a good point, but I also >>>>> can't >>>>>>> quite >>>>>>>>>> wrap my head >>>>>>>>>>>>>>>>>>>>>>> around how we can actually guarantee >>>>> it to >>>>>>> work >>>>>>>>>> in general. >>>>>>>>>>>>>>>>>>>>>>> For example, the case you cited, >>>>> where the >>>>>>>>>> implementation of >>>>>>>>>>>>>>>>>>>>>>> `KeyValueStore#put(key, value)` uses >>>>> the >>>>>>> context >>>>>>>>>> to augment >>>>>>>>>>>>>>>>>>>>>>> the record with timestamp >>>>> information. This >>>>>>>>>> relies on the >>>>>>>>>>>>>>>>>>>>>>> assumption that you would only call >>>>>>> "put()" from >>>>>>>>>> inside a >>>>>>>>>>>>>>>>>>>>>>> `Processor#process(key, value)` call >>>>> in >>>>>>> which >>>>>>>>>> the record >>>>>>>>>>>>>>>>>>>>>>> being processed is the same record >>>>> that >>>>>>> you're >>>>>>>>>> trying to put >>>>>>>>>>>>>>>>>>>>>>> into the store. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a >>>>>>> punctuator, or >>>>>>>>>> do a >>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one >>> of >>>>>>> those >>>>>>>>>> records with >>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle >>> bug >>>>> on >>>>>>> your >>>>>>>>>> hands. Right >>>>>>>>>>>>>>>>>>>>>>> now, the Streams component that >>>>> actually >>>>>>> calls >>>>>>>>>> the Processor >>>>>>>>>>>>>>>>>>>>>>> takes care to set the right record >>>>> context >>>>>>>>>> before invoking >>>>>>>>>>>>>>>>>>>>>>> the method, and in the case of >>>>> caching, >>>>>>> etc., it >>>>>>>>>> also takes >>>>>>>>>>>>>>>>>>>>>>> care to swap out the old context and >>>>> keep >>>>>>> it >>>>>>>>>> somewhere safe. >>>>>>>>>>>>>>>>>>>>>>> But when it comes to public API >>>>> Processors >>>>>>>>>> calling methods >>>>>>>>>>>>>>>>>>>>>>> on StateStores, there's no >>>>> opportunity for >>>>>>> any >>>>>>>>>> component to >>>>>>>>>>>>>>>>>>>>>>> make sure the context is always >>>>> correct. >>>>>>>>>>>>>>>>>>>>>>> In the face of that situation, it >>>>> seemed >>>>>>> better >>>>>>>>>> to just move >>>>>>>>>>>>>>>>>>>>>>> in the direction of a "normal" data >>>>> store. >>>>>>> I.e., >>>>>>>>>> when you >>>>>>>>>>>>>>>>>>>>>>> use a HashMap or RocksDB or other >>>>> "state >>>>>>>>>> stores", you don't >>>>>>>>>>>>>>>>>>>>>>> expect them to automatically know >>>>> extra >>>>>>> stuff >>>>>>>>>> about the >>>>>>>>>>>>>>>>>>>>>>> record you're storing. If you need >>>>> them to >>>>>>> know >>>>>>>>>> something, >>>>>>>>>>>>>>>>>>>>>>> you just put it in the value. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> All of that said, I'm just reasoning >>>>> from >>>>>>> first >>>>>>>>>> principles >>>>>>>>>>>>>>>>>>>>>>> here. To really know if this is a >>>>> mistake >>>>>>> or >>>>>>>>>> not, I need to >>>>>>>>>>>>>>>>>>>>>>> be in your place. So please push >>> back >>>>> if >>>>>>> you >>>>>>>>>> think what I >>>>>>>>>>>>>>>>>>>>>>> said is nonsense. My personal plan >>>>> was to >>>>>>> keep >>>>>>>>>> an eye out >>>>>>>>>>>>>>>>>>>>>>> during the period where the old API >>>>> was >>>>>>> still >>>>>>>>>> present, but >>>>>>>>>>>>>>>>>>>>>>> deprecated, to see if people were >>>>>>> struggling to >>>>>>>>>> use the new >>>>>>>>>>>>>>>>>>>>>>> API. If so, then we'd have a chance >>> to >>>>>>> address >>>>>>>>>> it before >>>>>>>>>>>>>>>>>>>>>>> dropping the old API. But it's even >>>>> better >>>>>>> if >>>>>>>>>> you can help >>>>>>>>>>>>>>>>>>>>>>> think it through now. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> It did also cross my mind to _not_ >>>>> add the >>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, but just to >>>>> continue to >>>>>>> punt >>>>>>>>>> on the >>>>>>>>>>>>>>>>>>>>>>> question by just dropping in the new >>>>>>>>>> ProcessorContext to the >>>>>>>>>>>>>>>>>>>>>>> new init method. If >>> StateStoreContext >>>>>>> seems too >>>>>>>>>> bold, we can >>>>>>>>>>>>>>>>>>>>>>> go that direction. But if we >>> actually >>>>> add >>>>>>> some >>>>>>>>>> methods to >>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, I'd like to be >>>>> able to >>>>>>> ensure >>>>>>>>>> they would >>>>>>>>>>>>>>>>>>>>>>> be well defined. I think the current >>>>>>> situation >>>>>>>>>> was more of >>>>>>>>>>>>>>>>>>>>>>> an oversight than a choice. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks again for your reply, >>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Wed, 2020-09-09 at 21:23 -0500, >>>>> Paul >>>>>>> Whalen >>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> John, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> It's exciting to see this KIP head >>>>> in >>>>>>> this >>>>>>>>>> direction! In >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> last >>>>>>>>>>>>>>>>>>>>> year >>>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>> so I've tried to sketch out some >>>>>>> usability >>>>>>>>>> improvements for >>>>>>>>>>>>>>>>>>> custom >>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>> stores, and I also ended up >>>>> splitting >>>>>>> out the >>>>>>>>>>>>>>>>> StateStoreContext >>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext in an attempt to >>>>>>> facilitate >>>>>>>>>> what I was >>>>>>>>>>>>>>>>> doing. I >>>>>>>>>>>>>>>>>>>>> sort of >>>>>>>>>>>>>>>>>>>>>>>> abandoned it when I realized how >>>>> large >>>>>>> the >>>>>>>>>> ideal change >>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>> to be, >>>>>>>>>>>>>>>>>>>>>>>> but it's great to see that there >>> is >>>>> other >>>>>>>>>> interest in >>>>>>>>>>>>>>>> moving >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> direction (from the folks that >>>>> matter :) >>>>>>> ). >>>>>>>>>>>>>>>>>>>>>>>> Having taken a stab at it myself, >>> I >>>>> have >>>>>>> a >>>>>>>>>> comment/question >>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> bullet >>>>>>>>>>>>>>>>>>>>>>>> about StateStoreContext: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> It does *not* include anything >>>>>>> processor- or >>>>>>>>>> record- >>>>>>>>>>>>>>>>> specific, >>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>> `forward()` or any information >>>>> about >>>>>>> the >>>>>>>>>> "current" >>>>>>>>>>>>>>>> record, >>>>>>>>>>>>>>>>>>> which is >>>>>>>>>>>>>>>>>>>>>>> only a >>>>>>>>>>>>>>>>>>>>>>>>> well-defined in the context of >>> the >>>>>>>>>> Processor. Processors >>>>>>>>>>>>>>>>>>> process >>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>>>>> at a time, but state stores may >>> be >>>>>>> used to >>>>>>>>>> store and >>>>>>>>>>>>>>>> fetch >>>>>>>>>>>>>>>>> many >>>>>>>>>>>>>>>>>>>>>>> records, so >>>>>>>>>>>>>>>>>>>>>>>>> there is no "current record". >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I totally agree that >>>>> record-specific or >>>>>>>>>> processor-specific >>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>> in a >>>>>>>>>>>>>>>>>>>>>>>> state store is often not >>>>> well-defined >>>>>>> and it >>>>>>>>>> would be good >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> separate >>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>> out, but sometimes it (at least >>>>>>>>>> record-specific context) is >>>>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>>>>> useful, for example, passing the >>>>> record's >>>>>>>>>> timestamp through >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> underlying storage (or changelog >>>>> topic): >>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 >>>>>>>>>>>>>>>>>>>>>>>> You could have the writer client >>> of >>>>> the >>>>>>> state >>>>>>>>>> store pass >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> through, >>>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>> it would be nice to be able to >>> write >>>>>>> state >>>>>>>>>> stores where the >>>>>>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>>>>> did >>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>> have this responsibility. I'm not >>>>> sure >>>>>>> if the >>>>>>>>>> solution is >>>>>>>>>>>>>>>>> to add >>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>> things back to StateStoreContext, >>> or >>>>>>> make yet >>>>>>>>>> another >>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>> represents record-specific context >>>>> while >>>>>>>>>> inside a state >>>>>>>>>>>>>>>>> store. >>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 5:43 PM >>> John >>>>>>> Roesler < >>>>>>>>>>>>>>>>> j...@vvcephei.org> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I've been slowly pushing KIP-478 >>>>>>> forward >>>>>>>>>> over the last >>>>>>>>>>>>>>>>> year, >>>>>>>>>>>>>>>>>>>>>>>>> and I'm happy to say that we're >>>>> making >>>>>>> good >>>>>>>>>> progress now. >>>>>>>>>>>>>>>>>>>>>>>>> However, several issues with the >>>>>>> original >>>>>>>>>> design have >>>>>>>>>>>>>>>> come >>>>>>>>>>>>>>>>>>>>>>>>> to light. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The major changes: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> We discovered that the original >>>>> plan >>>>>>> of just >>>>>>>>>> adding >>>>>>>>>>>>>>>> generic >>>>>>>>>>>>>>>>>>>>>>>>> parameters to ProcessorContext >>>>> was too >>>>>>>>>> disruptive, so we >>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>> now adding a new >>>>> api.ProcessorContext. >>>>>>>>>>>>>>>>>>>>>>>>> That choice forces us to add a >>> new >>>>>>>>>> StateStore.init method >>>>>>>>>>>>>>>>>>>>>>>>> for the new context, but >>>>>>> ProcessorContext >>>>>>>>>> really isn't >>>>>>>>>>>>>>>>> ideal >>>>>>>>>>>>>>>>>>>>>>>>> for state stores to begin with, >>>>> so I'm >>>>>>>>>> proposing a new >>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext for this >>>>> purpose. In >>>>>>> a >>>>>>>>>> nutshell, there >>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>> quite a few methods in >>>>>>> ProcessorContext that >>>>>>>>>> actually >>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>> never be called from inside a >>>>>>> StateStore. >>>>>>>>>>>>>>>>>>>>>>>>> Also, since there is a new >>>>>>> ProcessorContext >>>>>>>>>> interface, we >>>>>>>>>>>>>>>>>>>>>>>>> need a new MockProcessorContext >>>>>>>>>> implementation in the >>>>>>>>>>>>>>>> test- >>>>>>>>>>>>>>>>>>>>>>>>> utils module. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The changeset for the KIP >>>>> document is >>>>>>> here: >>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 >>>>>>>>>>>>>>>>>>>>>>>>> And the KIP itself is here: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API >>>>>>>>>>>>>>>>>>>>>>>>> If you have any concerns, please >>>>> let >>>>>>> me know! >>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >