Hi Paul,

It's good to hear from you!

I'm glad you're in favor of the direction. Especially when
it comes to public API and usability concens, I tend to
think that "the folks who matter" are actually the folks who
have to use the APIs to accomplish real tasks. It can be
hard for me to be sure I'm thinking clearly from that
perspective.

Funny story, I also started down this road a couple of times
already and backed them out before the KIP because I was
afraid of the scope of the proposal. Unfortunately, needing
to make a new ProcessorContext kind of forced my hand.

I see you've called me out about the ChangeLogging stores :)
In fact, I think these are the main/only reason that stores
might really need to invoke "forward()". My secret plan was
to cheat and either accomplish change-logging by a different
mechanism than implementing the store interface, or by just
breaking encapsulation to sneak the "real" ProcessorContext
into the ChangeLogging stores. But those are all
implementation details. I think the key question is whether
anyone else has a store implementation that needs to call
"forward()". It's not what you mentioned, but since you
spoke up, I'll just ask: if you have a use case for calling
"forward()" in a store, please share it.

Regarding the other record-specific context methods, I think
you have a good point, but I also can't quite wrap my head
around how we can actually guarantee it to work in general.
For example, the case you cited, where the implementation of
`KeyValueStore#put(key, value)` uses the context to augment
the record with timestamp information. This relies on the
assumption that you would only call "put()" from inside a
`Processor#process(key, value)` call in which the record
being processed is the same record that you're trying to put
into the store.

If you were to call "put" from a punctuator, or do a
`range()` query and then update one of those records with
`put()`, you'd have a very subtle bug on your hands. Right
now, the Streams component that actually calls the Processor
takes care to set the right record context before invoking
the method, and in the case of caching, etc., it also takes
care to swap out the old context and keep it somewhere safe.
But when it comes to public API Processors calling methods
on StateStores, there's no opportunity for any component to
make sure the context is always correct.

In the face of that situation, it seemed better to just move
in the direction of a "normal" data store. I.e., when you
use a HashMap or RocksDB or other "state stores", you don't
expect them to automatically know extra stuff about the
record you're storing. If you need them to know something,
you just put it in the value.

All of that said, I'm just reasoning from first principles
here. To really know if this is a mistake or not, I need to
be in your place. So please push back if you think what I
said is nonsense. My personal plan was to keep an eye out
during the period where the old API was still present, but
deprecated, to see if people were struggling to use the new
API. If so, then we'd have a chance to address it before
dropping the old API. But it's even better if you can help
think it through now.

It did also cross my mind to _not_ add the
StateStoreContext, but just to continue to punt on the
question by just dropping in the new ProcessorContext to the
new init method. If StateStoreContext seems too bold, we can
go that direction. But if we actually add some methods to
StateStoreContext, I'd like to be able to ensure they would
be well defined. I think the current situation was more of
an oversight than a choice.

Thanks again for your reply,
-John


On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote:
> John,
> 
> It's exciting to see this KIP head in this direction!  In the last year or
> so I've tried to sketch out some usability improvements for custom state
> stores, and I also ended up splitting out the StateStoreContext from the
> ProcessorContext in an attempt to facilitate what I was doing.  I sort of
> abandoned it when I realized how large the ideal change might have to be,
> but it's great to see that there is other interest in moving in this
> direction (from the folks that matter :) ).
> 
> Having taken a stab at it myself, I have a comment/question on this bullet
> about StateStoreContext:
> 
> It does *not*  include anything processor- or record- specific, like
> > `forward()` or any information about the "current" record, which is only a
> > well-defined in the context of the Processor. Processors process one record
> > at a time, but state stores may be used to store and fetch many records, so
> > there is no "current record".
> > 
> 
> I totally agree that record-specific or processor-specific context in a
> state store is often not well-defined and it would be good to separate that
> out, but sometimes it (at least record-specific context) is actually
> useful, for example, passing the record's timestamp through to the
> underlying storage (or changelog topic):
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> 
> You could have the writer client of the state store pass this through, but
> it would be nice to be able to write state stores where the client did not
> have this responsibility.  I'm not sure if the solution is to add some
> things back to StateStoreContext, or make yet another context that
> represents record-specific context while inside a state store.
> 
> Best,
> Paul
> 
> On Wed, Sep 9, 2020 at 5:43 PM John Roesler <[email protected]> wrote:
> 
> > Hello all,
> > 
> > I've been slowly pushing KIP-478 forward over the last year,
> > and I'm happy to say that we're making good progress now.
> > However, several issues with the original design have come
> > to light.
> > 
> > The major changes:
> > 
> > We discovered that the original plan of just adding generic
> > parameters to ProcessorContext was too disruptive, so we are
> > now adding a new api.ProcessorContext.
> > 
> > That choice forces us to add a new StateStore.init method
> > for the new context, but ProcessorContext really isn't ideal
> > for state stores to begin with, so I'm proposing a new
> > StateStoreContext for this purpose. In a nutshell, there are
> > quite a few methods in ProcessorContext that actually should
> > never be called from inside a StateStore.
> > 
> > Also, since there is a new ProcessorContext interface, we
> > need a new MockProcessorContext implementation in the test-
> > utils module.
> > 
> > 
> > 
> > The changeset for the KIP document is here:
> > 
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > 
> > And the KIP itself is here:
> > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > 
> > 
> > If you have any concerns, please let me know!
> > 
> > Thanks,
> > -John
> > 
> > 

Reply via email to