Thanks for the reply, Paul! I certainly intend to make sure that the changelogging layer continues to work the way it does now, by hook or by crook. I think the easiest path for me is to just "cheat" and get the real ProcessorContext into the ChangeLoggingStore implementation somehow. I'll tag you on the PR when I create it, so you have an opportunity to express a preference about the implementation choice, and maybe even compile/test against it to make sure your stuff still works.
Regarding this: > we have an interest in making a state store with a richer > way of querying its data (like perhaps getting all values > associated with a secondary key), while still ultimately > writing to the changelog topic for later restoration. This is very intriguing to me. On the side, I've been preparing a couple of ideas related to this topic. I don't think I have a coherent enough thought to even express it in a Jira right now, but when I do, I'll tag you on it also to see what you think. Whenever you're ready to share the usability improvement ideas, I'm very interested to see what you've come up with. Thanks, -John On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote: > > when you use a HashMap or RocksDB or other "state stores", you don't > > expect them to automatically know extra stuff about the record you're > > storing. > > So, I don't think there is any reason we *can't* retain the record context > > in the StateStoreContext, and if any users came along with a clear use case > > I'd find that convincing. > > > > I agree with the principle of being conservative with the StateStoreContext > API. Regarding user expectations or a clear use case, the only > counterpoint I would offer is that we sort of have that use case already, > which is the example I gave of the change logging store using the > timestamp. I am curious if this functionality will be retained when using > built in state stores, or will a low-level processor get a KeyValueStore > that no longer writes to the changelog topic with the record's timestamp. > While I personally don't care much about that functionality specifically, I > have a general desire for custom state stores to easily do the things that > built in state stores do. > > It genuinely did not occur to me that users might be looking up and/or > > updating records of other keys from within a Processor. > > > > I'm glad you said this Sophie, because it gives me an opportunity to say > that this is actually a *huge* use case for my team. The state store > usability improvements I was referring to in my previous message were about > enabling the user to write custom stores while still easily hooking into > the ability to write to a changelog topic. I think that is technically > possible now, but I don't think it's trivial. Specifically, we have an > interest in making a state store with a richer way of querying its data > (like perhaps getting all values associated with a secondary key), while > still ultimately writing to the changelog topic for later restoration. > > We recognize that this use case throws away some of what kafka streams > (especially the DSL) is good at - easy parallelizability by partitioning > all processing by key - and that our business logic would completely fall > apart if we were consuming from multi-partition topics with multiple > consumers. But we have found that using the low level processor API is > good for the very simple stream processing primitives it provides: handling > the plumbing of consuming from multiple kafka topics and potentially > updating persistent local state in a reliable way. That in itself has > proven to be a worthwhile programming model. > > Since I got off track a bit, let me summarize: I don't particularly care > about the record context being available to state store implementations, > and I think this KIP is headed in the right direction in that regard. But > more generally, I wanted to express the importance of maintaining a > powerful and flexible StateStore interface. > > Thanks! > Paul > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > > > Aha, I did misinterpret the example in your previous response regarding the > > range query after all. I thought you just meant a time-range query inside a > > punctuator. It genuinely did not occur to me that users might be looking up > > and/or updating records of other keys from within a Processor. Sorry for > > being closed minded > > > > I won't drag out this discussion any further by asking whether that might > > be > > a valid use case or just a lurking bug in itself :) > > > > Thanks for humoring me. The current proposal for KIP-478 sounds good to me > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <vvcep...@apache.org> wrote: > > > > > Ah, thanks Sophie, > > > > > > I'm sorry for misinterpreting your resonse. Yes, we > > > absolutely can and should clear the context before > > > punctuating. > > > > > > My secondary concern is maybe more far-fetched. I was > > > thinking that inside process(key,value), a Processor might > > > do a get/put of a _different_ key. Consider, for example, > > > the way that Suppress processors work. When they get a > > > record, they add it to the store and then do a range scan > > > and possibly forward a _different_ record. Of course, this > > > is an operation that is deeply coupled to the internals, and > > > the Suppress processor accordingly actually does get access > > > to the internal context so that it can set the context > > > before forwarding. > > > > > > Still, it seems like I've had a handful of conversations > > > with people over the years in which they tell me they are > > > using state stores in a way that transcends the "get and put > > > the currently processing record" access pattern. I doubt > > > that those folks would even have considered the possiblity > > > that the currently processing record's _context_ could > > > pollute their state store operations, as I myself never gave > > > it a second thought until the current conversation began. In > > > cases like that, we have actually set a trap for these > > > people, and it seems better to dismantle the trap. > > > > > > As you noted, really the only people who would be negatively > > > impacted are people who implement their own state stores. > > > These folks will get the deprecation warning and try to > > > adapt their stores to the new interface. If they needed > > > access to the record context, they would find it's now > > > missing. They'd ask us about it, and we'd have the ability > > > to explain the lurking bug that they have had in their > > > stores all along, as well as the new recommended pattern > > > (just pass everything you need in the value). If that's > > > unsatisfying, _then_ we should consider amending the API. > > > > > > Thanks, > > > -John > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman > > > wrote: > > > > > Regarding your first sentence, "...the processor would null > > > > > out the record context...", this is not possible, since the > > > > > processor doesn't have write access to the context. We could > > > > > add it, > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely did not mean to imply that > > > we > > > > should make the context modifiable by the Processors themselves. I > > meant > > > > this should be handled by the internal processing framework that deals > > > with > > > > passing records from one Processor to the next, setting the record > > > context > > > > when a new record is picked up, invoking the punctuators, etc. I > > believe > > > > this > > > > all currently happens in the StreamTask? It already can and does > > > overwrite > > > > the record context as new records are processed, and is also > > responsible > > > > for calling the punctuators, so it doesn't seem like a huge leap to > > just > > > say > > > > "null out the current record before punctuating" > > > > > > > > To clarify, I was never advocating or even considering to give the > > > > Processors > > > > write access to the record context. Sorry if my last message (or all of > > > > them) > > > > was misleading. I just wanted to point out that the punctuator concern > > is > > > > orthogonal to the question of whether we should include the record > > > context > > > > in the StateStoreContext. It's definitely a real problem, but it's a > > > > problem > > > > that exists at the Processor level and not just the StateStore. > > > > > > > > So, I don't think there is any reason we *can't* retain the record > > > context > > > > in the > > > > StateStoreContext, and if any users came along with a clear use case > > I'd > > > > find > > > > that convincing. In the absence of any examples, the conservative > > > approach > > > > sounds good to me. > > > > > > > > If it turns out that someone did need the record context in their > > custom > > > > state > > > > store, I'm sure they'll submit a politely worded bug report alerting us > > > > that we > > > > broke their application. > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > Thanks, Sophie, > > > > > > > > > > Yes, now that you point it out, I can see that the record > > > > > context itself should be nulled out by Streams before > > > > > invoking punctuators. From that perspective, we don't need > > > > > to think about the second-order problem of what's in the > > > > > context for the state store when called from a punctuator. > > > > > > > > > > Regarding your first sentence, "...the processor would null > > > > > out the record context...", this is not possible, since the > > > > > processor doesn't have write access to the context. We could > > > > > add it, but then all kinds of strange effects would ensue > > > > > when downstream processors execute but the context is empty, > > > > > etc. Better to just let the framework manage the record > > > > > context and keep it read-only for Processors. > > > > > > > > > > Reading between the lines of your last reply, it sounds that > > > > > the disconnect may just have been a mutual misunderstanding > > > > > about whether or not Processors currently have access to set > > > > > the record context. Since they do not, if we wanted to add > > > > > the record context to StateStoreContext in a well-defined > > > > > way, we'd also have to add the ability for Processors to > > > > > manipulate it. But then, we're just creating a side-channel > > > > > for Processors to pass some information in arguments to > > > > > "put()" and other information implicitly through the > > > > > context. It seems better just to go for a single channel for > > > > > now. > > > > > > > > > > It sounds like you're basically in favor of the conservative > > > > > approach, and you just wanted to understand the blockers > > > > > that I implied. Does my clarification make sense? > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman > > > > > wrote: > > > > > > I was just thinking that the processor would null out the record > > > context > > > > > > after it > > > > > > finished processing the record, so I'm not sure I follow why this > > > would > > > > > not > > > > > > be > > > > > > possible? AFAIK we never call a punctuator in the middle of > > > processing a > > > > > > record through the topology, and even if we did, we still know when > > > it is > > > > > > about > > > > > > to be called and could set it to null beforehand. > > > > > > > > > > > > I'm not trying to advocate for it here, I'm in agreement that > > > anything > > > > > you > > > > > > want > > > > > > to access within the store can and should be accessed within the > > > calling > > > > > > Processor/Punctuator before reaching the store. The "we can always > > > add it > > > > > > later if necessary" argument is also pretty convincing. Just trying > > > to > > > > > > understand > > > > > > why this wouldn't be possible. > > > > > > > > > > > > FWIW, the question of "what is the current record in the context > > of a > > > > > > Punctuator" > > > > > > exists independently of whether we want to add this to the > > > > > StateStoreContext > > > > > > or not. The full ProcessorContext, including the current record > > > context, > > > > > is > > > > > > already available within a Punctuator, so removing the current > > record > > > > > > context > > > > > > from the StateStoreContext does not solve the problem. Users can -- > > > and > > > > > have > > > > > > (see KAFKA-9584 <https://issues.apache.org/jira/browse/KAFKA-9584 > > > ;;) > > > -- > > > > > hit > > > > > > such subtle bugs without ever invoking a StateStore > > > > > > from their punctuator. > > > > > > > > > > > > Again, I think I do agree that we should leave the current record > > > context > > > > > > off of > > > > > > the StateStoreContext, but I don't think the Punctuator argument > > > against > > > > > it > > > > > > is > > > > > > very convincing. It sounds to me like we need to disallow access to > > > the > > > > > > current > > > > > > record context from within the Punctuator, independent of anything > > > to do > > > > > > with > > > > > > state stores > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler <vvcep...@apache.org> > > > > > wrote: > > > > > > > Thanks for the thoughts, Sophie. > > > > > > > > > > > > > > I agree that the extra information could be useful. My only > > > concern is > > > > > > > that it doesn’t seem like we can actually supply that extra > > > information > > > > > > > correctly. So, then we have a situation where the system offers > > > useful > > > > > API > > > > > > > calls that are only correct in a narrow range of use cases. > > > Outside of > > > > > > > those use cases, you get incorrect behavior. > > > > > > > > > > > > > > If it were possible to null out the context before you put a > > > document > > > > > to > > > > > > > which the context doesn’t apply, then the concern would be > > > mitigated. > > > > > But > > > > > > > it would still be pretty weird from the perspective of the store > > > that > > > > > > > sometimes the context is populated and other times, it’s null. > > > > > > > > > > > > > > But that seems moot, since it doesn’t seem possible to null out > > the > > > > > > > context. Only the Processor could know whether it’s about to put > > a > > > > > document > > > > > > > different from the context or not. And it would be inappropriate > > to > > > > > offer a > > > > > > > public ProcessorContext api to manage the record context. > > > > > > > > > > > > > > Ultimately, it still seems like if you want to store headers, you > > > can > > > > > > > store them explicitly, right? That doesn’t seem onerous to me, > > and > > > it > > > > > kind > > > > > > > of seems better than relying on undefined or asymmetrical > > behavior > > > in > > > > > the > > > > > > > store itself. > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t solve these problems. > > Just > > > > > that it > > > > > > > seems a little that we can be conservative and avoid them for > > now. > > > If > > > > > it > > > > > > > turns out we really need to solve them, we can always do it > > later. > > > > > > > Thanks, > > > > > > > John > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote: > > > > > > > > > If you were to call "put" from a punctuator, or do a > > > > > > > > > `range()` query and then update one of those records with > > > > > > > > > `put()`, you'd have a very subtle bug on your hands. > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I agree that the punctuator > > > case is > > > > > an > > > > > > > > obvious exemption to the assumption that store invocations > > always > > > > > > > > have a corresponding "current record", but I don't understand > > the > > > > > > > > second example. Are you envisioning a scenario where the > > #process > > > > > > > > method performs a range query and then updates records? Or were > > > > > > > > you just giving another example of the punctuator case? > > > > > > > > > > > > > > > > I only bring it up because I agree that the current record > > > > > information > > > > > > > could > > > > > > > > still be useful within the context of the store. As a non-user > > my > > > > > input > > > > > > > on > > > > > > > > this > > > > > > > > definitely has limited value, but it just isn't striking me as > > > > > obvious > > > > > > > that > > > > > > > > we > > > > > > > > should remove access to the current record context from the > > state > > > > > stores. > > > > > > > > If there is no current record, as in the punctuator case, we > > > should > > > > > just > > > > > > > > set > > > > > > > > the record context to null (or Optional.empty, etc). > > > > > > > > > > > > > > > > That said, the put() always has to come from somewhere, and > > that > > > > > > > > somewhere is always going to be either a Processor or a > > > Punctuator, > > > > > both > > > > > > > > of which will still have access to the full context. So > > > additional > > > > > info > > > > > > > > such as > > > > > > > > the timestamp can and should probably be supplied to the store > > > before > > > > > > > > calling put(), rather than looked up by the store. But I can > > see > > > some > > > > > > > other > > > > > > > > things being useful, for example the current record's headers. > > > Maybe > > > > > > > if/when > > > > > > > > we add better (or any) support for headers in state stores this > > > will > > > > > be > > > > > > > > less true. > > > > > > > > > > > > > > > > Of course as John has made clear, it's pretty hard to judge > > > without > > > > > > > > examples > > > > > > > > and more insight as to what actually goes on within a custom > > > state > > > > > store > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler < > > vvcep...@apache.org > > > > > wrote: > > > > > > > > > Hi Paul, > > > > > > > > > > > > > > > > > > It's good to hear from you! > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the direction. Especially when > > > > > > > > > it comes to public API and usability concens, I tend to > > > > > > > > > think that "the folks who matter" are actually the folks who > > > > > > > > > have to use the APIs to accomplish real tasks. It can be > > > > > > > > > hard for me to be sure I'm thinking clearly from that > > > > > > > > > perspective. > > > > > > > > > > > > > > > > > > Funny story, I also started down this road a couple of times > > > > > > > > > already and backed them out before the KIP because I was > > > > > > > > > afraid of the scope of the proposal. Unfortunately, needing > > > > > > > > > to make a new ProcessorContext kind of forced my hand. > > > > > > > > > > > > > > > > > > I see you've called me out about the ChangeLogging stores :) > > > > > > > > > In fact, I think these are the main/only reason that stores > > > > > > > > > might really need to invoke "forward()". My secret plan was > > > > > > > > > to cheat and either accomplish change-logging by a different > > > > > > > > > mechanism than implementing the store interface, or by just > > > > > > > > > breaking encapsulation to sneak the "real" ProcessorContext > > > > > > > > > into the ChangeLogging stores. But those are all > > > > > > > > > implementation details. I think the key question is whether > > > > > > > > > anyone else has a store implementation that needs to call > > > > > > > > > "forward()". It's not what you mentioned, but since you > > > > > > > > > spoke up, I'll just ask: if you have a use case for calling > > > > > > > > > "forward()" in a store, please share it. > > > > > > > > > > > > > > > > > > Regarding the other record-specific context methods, I think > > > > > > > > > you have a good point, but I also can't quite wrap my head > > > > > > > > > around how we can actually guarantee it to work in general. > > > > > > > > > For example, the case you cited, where the implementation of > > > > > > > > > `KeyValueStore#put(key, value)` uses the context to augment > > > > > > > > > the record with timestamp information. This relies on the > > > > > > > > > assumption that you would only call "put()" from inside a > > > > > > > > > `Processor#process(key, value)` call in which the record > > > > > > > > > being processed is the same record that you're trying to put > > > > > > > > > into the store. > > > > > > > > > > > > > > > > > > If you were to call "put" from a punctuator, or do a > > > > > > > > > `range()` query and then update one of those records with > > > > > > > > > `put()`, you'd have a very subtle bug on your hands. Right > > > > > > > > > now, the Streams component that actually calls the Processor > > > > > > > > > takes care to set the right record context before invoking > > > > > > > > > the method, and in the case of caching, etc., it also takes > > > > > > > > > care to swap out the old context and keep it somewhere safe. > > > > > > > > > But when it comes to public API Processors calling methods > > > > > > > > > on StateStores, there's no opportunity for any component to > > > > > > > > > make sure the context is always correct. > > > > > > > > > > > > > > > > > > In the face of that situation, it seemed better to just move > > > > > > > > > in the direction of a "normal" data store. I.e., when you > > > > > > > > > use a HashMap or RocksDB or other "state stores", you don't > > > > > > > > > expect them to automatically know extra stuff about the > > > > > > > > > record you're storing. If you need them to know something, > > > > > > > > > you just put it in the value. > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning from first principles > > > > > > > > > here. To really know if this is a mistake or not, I need to > > > > > > > > > be in your place. So please push back if you think what I > > > > > > > > > said is nonsense. My personal plan was to keep an eye out > > > > > > > > > during the period where the old API was still present, but > > > > > > > > > deprecated, to see if people were struggling to use the new > > > > > > > > > API. If so, then we'd have a chance to address it before > > > > > > > > > dropping the old API. But it's even better if you can help > > > > > > > > > think it through now. > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_ add the > > > > > > > > > StateStoreContext, but just to continue to punt on the > > > > > > > > > question by just dropping in the new ProcessorContext to the > > > > > > > > > new init method. If StateStoreContext seems too bold, we can > > > > > > > > > go that direction. But if we actually add some methods to > > > > > > > > > StateStoreContext, I'd like to be able to ensure they would > > > > > > > > > be well defined. I think the current situation was more of > > > > > > > > > an oversight than a choice. > > > > > > > > > > > > > > > > > > Thanks again for your reply, > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote: > > > > > > > > > > John, > > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head in this direction! In > > the > > > > > last > > > > > > > year > > > > > > > > > or > > > > > > > > > > so I've tried to sketch out some usability improvements for > > > > > custom > > > > > > > state > > > > > > > > > > stores, and I also ended up splitting out the > > > StateStoreContext > > > > > from > > > > > > > the > > > > > > > > > > ProcessorContext in an attempt to facilitate what I was > > > doing. I > > > > > > > sort of > > > > > > > > > > abandoned it when I realized how large the ideal change > > might > > > > > have > > > > > > > to be, > > > > > > > > > > but it's great to see that there is other interest in > > moving > > > in > > > > > this > > > > > > > > > > direction (from the folks that matter :) ). > > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself, I have a comment/question > > > on > > > > > this > > > > > > > > > bullet > > > > > > > > > > about StateStoreContext: > > > > > > > > > > > > > > > > > > > > It does *not* include anything processor- or record- > > > specific, > > > > > like > > > > > > > > > > > `forward()` or any information about the "current" > > record, > > > > > which is > > > > > > > > > only a > > > > > > > > > > > well-defined in the context of the Processor. Processors > > > > > process > > > > > > > one > > > > > > > > > record > > > > > > > > > > > at a time, but state stores may be used to store and > > fetch > > > many > > > > > > > > > records, so > > > > > > > > > > > there is no "current record". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I totally agree that record-specific or processor-specific > > > > > context > > > > > > > in a > > > > > > > > > > state store is often not well-defined and it would be good > > to > > > > > > > separate > > > > > > > > > that > > > > > > > > > > out, but sometimes it (at least record-specific context) is > > > > > actually > > > > > > > > > > useful, for example, passing the record's timestamp through > > > to > > > > > the > > > > > > > > > > underlying storage (or changelog topic): > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 > > > > > > > > > > You could have the writer client of the state store pass > > this > > > > > > > through, > > > > > > > > > but > > > > > > > > > > it would be nice to be able to write state stores where the > > > > > client > > > > > > > did > > > > > > > > > not > > > > > > > > > > have this responsibility. I'm not sure if the solution is > > > to add > > > > > > > some > > > > > > > > > > things back to StateStoreContext, or make yet another > > context > > > > > that > > > > > > > > > > represents record-specific context while inside a state > > > store. > > > > > > > > > > Best, > > > > > > > > > > Paul > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler < > > > j...@vvcephei.org> > > > > > > > wrote: > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478 forward over the last > > > year, > > > > > > > > > > > and I'm happy to say that we're making good progress now. > > > > > > > > > > > However, several issues with the original design have > > come > > > > > > > > > > > to light. > > > > > > > > > > > > > > > > > > > > > > The major changes: > > > > > > > > > > > > > > > > > > > > > > We discovered that the original plan of just adding > > generic > > > > > > > > > > > parameters to ProcessorContext was too disruptive, so we > > > are > > > > > > > > > > > now adding a new api.ProcessorContext. > > > > > > > > > > > > > > > > > > > > > > That choice forces us to add a new StateStore.init method > > > > > > > > > > > for the new context, but ProcessorContext really isn't > > > ideal > > > > > > > > > > > for state stores to begin with, so I'm proposing a new > > > > > > > > > > > StateStoreContext for this purpose. In a nutshell, there > > > are > > > > > > > > > > > quite a few methods in ProcessorContext that actually > > > should > > > > > > > > > > > never be called from inside a StateStore. > > > > > > > > > > > > > > > > > > > > > > Also, since there is a new ProcessorContext interface, we > > > > > > > > > > > need a new MockProcessorContext implementation in the > > test- > > > > > > > > > > > utils module. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The changeset for the KIP document is here: > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 > > > > > > > > > > > And the KIP itself is here: > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API > > > > > > > > > > > If you have any concerns, please let me know! > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > >