Thanks John. I like the proposal.

Btw: I was just going over the KIP and realized that we add new methods
to `StreamBuilder`, `Topology`, and `KStream` that take the new
`ProcessorSupplier` class -- should we also deprecate the corresponding
existing ones that take the old `ProcessorSupplier`?


-Matthias


On 9/30/20 7:46 AM, John Roesler wrote:
> Thanks Paul and Sophie,
> 
> Your feedback certainly underscores the need to be explicit
> in the javadoc about why that parameter is Optional. Getting
> this kind of feedback before the release is exactly the kind
> of outcome we hope to get from the KIP process!
> 
> Thanks,
> -John
> 
> On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
>> John, I totally agree that adding a method to Processor is cumbersome and
>> not a good path.  I was imagining maybe a separate interface that could be
>> used in the appropriate context, but I don't think that makes too much
>> sense - it's just too far away from what Kafka Streams is.  I was
>> originally more interested in the "why" Optional than the "how" (I think my
>> original reply overplayed the "optional as an argument" concern).  But
>> you've convinced me that there is a perfectly legitimate "why".  We should
>> make sure that it's clear why it's Optional, but I suppose that goes
>> without saying.  It's a nice opportunity to make the API reflect more what
>> is actually going on under the hood.
>>
>> Thanks!
>> Paul
>>
>> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman <sop...@confluent.io>
>> wrote:
>>
>>> FWIW, while I'm really not a fan of Optional in general, I agree that its
>>> usage
>>> here seems appropriate. Even for those rare software developers who
>>> carefully
>>> read all the docs several times over, I think it wouldn't be too hard to
>>> miss a
>>> note about the RecordMetadata possibly being null.
>>>
>>> Especially because it's not that obvious why at first glance, and takes a
>>> bit of
>>> thinking to realize that records originating from a Punctuator wouldn't
>>> have a
>>> "current record". This  is something that has definitely confused users
>>> today.
>>>
>>> It's on us to improve the education here -- and an Optional<RecordMetadata>
>>> would naturally raise awareness of this subtlety
>>>
>>> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io>
>>> wrote:
>>>
>>>> Does my reply address your concerns?
>>>>
>>>>
>>>> Yes; also, I definitely misread part of the proposal earlier and thought
>>>> you had put
>>>> the timestamp field in RecordMetadata. Sorry for not giving things a
>>>> closer look
>>>> before responding! I'm not sure my original message made much sense given
>>>> the misunderstanding, but thanks for responding anyway :P
>>>>
>>>> Having given the proposal a second pass, I agree, it's very elegant. +1
>>>>
>>>> On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org>
>>> wrote:
>>>>> Thanks for the reply, Sophie,
>>>>>
>>>>> I think I may have summarized too much in my prior reply.
>>>>>
>>>>> In the currently proposed KIP, any caller of forward() must
>>>>> supply a Record, which consists of:
>>>>> * key
>>>>> * value
>>>>> * timestamp
>>>>> * headers (with a convenience constructor that sets empty
>>>>> headers)
>>>>>
>>>>> These aren't what I was referring to as potentially being
>>>>> undefined downstream, since thanks to the introduction of
>>>>> Record, they are, as you're advocating, required to be
>>>>> defined everywhere, even when forwarding from a punctuator.
>>>>>
>>>>> So to be clear, the intent of this change is actually to
>>>>> _enforce_ that timestamp would never be undefined (which it
>>>>> currently can be). Also, since punctuators _are_ going to
>>>>> have to "make up" a timestamp going forward, we should note
>>>>> that the "punctuate" method currently passes in a good
>>>>> timestamp that they can use: for system-time punctuations,
>>>>> they receive the current system time, and for stream-time
>>>>> punctuations, they get the current stream time.
>>>>>
>>>>> The potentially undefined RecordMetadata only contains these
>>>>> fields:
>>>>> * topic
>>>>> * partition
>>>>> * offset
>>>>>
>>>>> These fields aren't required (or even used) in a Sink, and
>>>>> it doesn't seem like they would be important to many
>>>>> applications. Furthermore, it doesn't _seem_ like you'd even
>>>>> want to set these fields. They seem purely informational and
>>>>> only useful in the context when you are actually processing
>>>>> a real input record. It doesn't sound like you were asking
>>>>> for it, but just to put it on the record, I think if we were
>>>>> to require values for the metadata from punctuators, people
>>>>> would mostly just make up their own dummy values, to no
>>>>> one's benefit.
>>>>>
>>>>> I should also note that with the current
>>>>> Record/RecordMetadata split, we will have the freedom to
>>>>> move fields into the Record class (or even add new fields)
>>>>> if we want them to become "data" as opposed to "metadata" in
>>>>> the future.
>>>>>
>>>>> Thanks for your reply; I was similarly floored when I
>>>>> realized the true nature of the current situation. Does my
>>>>> reply address your concerns?
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
>>>>> wrote:
>>>>>>> However, the record metadata is only defined when the parent
>>> forwards
>>>>>>> while processing a
>>>>>>
>>>>>> real record, not when it calls forward from the punctuator
>>>>>>
>>>>>>
>>>>>> Can we take a step back for a second...why wouldn't you be required to
>>>>> set
>>>>>> the RecordContext
>>>>>> yourself when calling forward from a Punctuator? I think I agree with
>>>>> Paul
>>>>>> here, it seems kind of
>>>>>> absurd not to enforce that the RecordContext be present inside the
>>>>>> process() method.
>>>>>>
>>>>>> The original problem with Punctuators, as I understood it, was that
>>> all
>>>>> of
>>>>>> the RecordContext
>>>>>> fields were exposed automatically to both the Processor and any
>>>>> Punctuator,
>>>>>> due to being
>>>>>> direct methods on the ProcessorContext. We can't control which
>>>>>> ProcessorContext methods
>>>>>> someone will call from with a Punctuator vs from a Processor. The best
>>>>> we
>>>>>> could do was
>>>>>> set these "nonsense" fields to null when inside a Punctuator, or set
>>>>> them
>>>>>> to some dummy
>>>>>> values as you pointed out.
>>>>>>
>>>>>> But then you proposed the solution of a separate RecordContext which
>>> is
>>>>> not
>>>>>> attached to the
>>>>>> ProcessorContext at all. This seemed to solve the above problem very
>>>>>> neatly: we only pass
>>>>>> in the RecordContext to the process() method, so we don't have to
>>> worry
>>>>>> about people trying
>>>>>> to access these fields from within a Punctuator. The fields aren't
>>>>>> accessible unless they're
>>>>>> defined.
>>>>>>
>>>>>> So what happens when someone wants to forward something from within a
>>>>>> Punctuator? I
>>>>>> don't think it's reasonable to let the timestamp field be undefined,
>>>>> ever.
>>>>>> What if the Punctuator
>>>>>> forwards directly to a sink, or directly to some windowing logic. Are
>>> we
>>>>>> supposed to add
>>>>>> handling for the RecordContext == null case to every processor? Or are
>>>>> we
>>>>>> just going to
>>>>>> assume the implicit restriction that users will only forward records
>>>>> from a
>>>>>> Punctuator to
>>>>>> downstream processors that know how to handle and/or set the
>>>>> RecordContext
>>>>>> if it's
>>>>>> undefined. That seems to throw away a lot of the awesome safety added
>>> in
>>>>>> this KIP
>>>>>>
>>>>>> Apologies for the rant. But I feel pretty strongly that allowing to
>>>>> forward
>>>>>> records from a
>>>>>> Punctuator without a defined RecordContext would be asking for
>>> trouble.
>>>>>> Imo, if you
>>>>>> want to forward from a Punctuator, you need to store the info you need
>>>>> in
>>>>>> order to
>>>>>> set the timestamp, or make one up yourself
>>>>>>
>>>>>> (the one alternative I can think of here is that maybe we could pass
>>> in
>>>>> the
>>>>>> current
>>>>>> partition time, so users can at least put in a reasonable estimate for
>>>>> the
>>>>>> timestamp
>>>>>> that won't cause it to get dropped and won't potentially lurch the
>>>>>> streamtime far into
>>>>>> the future. This would be similar to what we do in the
>>>>> TimestampExtractor)
>>>>>> On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org>
>>>>> wrote:
>>>>>>> Oh, I guess one other thing I should have mentioned is that I’ve
>>>>> recently
>>>>>>> discovered that in cases where the context is undefined, we
>>> currently
>>>>> just
>>>>>>> fill in dummy values for the context. So there’s a good chance that
>>>>> real
>>>>>>> applications in use are depending on undefined context without even
>>>>>>> realizing it. What I’m hoping to do is just make the situation
>>>>> explicit and
>>>>>>> get rid of the dummy values.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> John
>>>>>>>
>>>>>>> On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
>>>>>>>> Thanks for the review, Paul!
>>>>>>>>
>>>>>>>> I had read some of that debate before. There seems to be some
>>>>> subtext
>>>>>>>> there, because they advise against using Optional in cases like
>>>>> this,
>>>>>>>> but there doesn’t seem to be a specific reason why it’s
>>>>> inappropriate.
>>>>>>>> I got the impression they were just afraid that people would go
>>>>>>>> overboard and make everything Optional.
>>>>>>>>
>>>>>>>> I could also make two methods, but it seemed like it might be an
>>>>>>>> unfortunate way to handle the issue, since Processor is just
>>> about a
>>>>>>>> Function as-is, but the two-method approach would require people
>>> to
>>>>>>>> implement both methods.
>>>>>>>>
>>>>>>>> To your question, this is something that’s only recently became
>>>>> clear
>>>>>>>> to me. Imagine you have a parent processor that calls forward both
>>>>> from
>>>>>>>> process and a punctuator. The child will have process() invoked in
>>>>> both
>>>>>>>> cases, and won’t be able to distinguish them. However, the record
>>>>>>>> metadata is only defined when the parent forwards while
>>> processing a
>>>>>>>> real record, not when it calls forward from the punctuator.
>>>>>>>>
>>>>>>>> This is why I wanted to make the metadata Optional, to advertise
>>>>> that
>>>>>>>> the metadata might be undefined if any ancestor processor ever
>>> calls
>>>>>>>> forward from a punctuator. We could remove the Optional and
>>> instead
>>>>>>>> just document that the argument might be null.
>>>>>>>>
>>>>>>>> With that context in place, what’s your take?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> John
>>>>>>>>
>>>>>>>> On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
>>>>>>>>> Looks pretty good to me, though the Processor#process(Record,
>>>>>>>>> Optional<RecordMetadata>) signature caught my eye.  There's some
>>>>>>> debate
>>>>>>>>> (
>>>>>>>>>
>>> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
>>>>>>> )
>>>>>>>>> about whether to use Optionals in arguments, and while that's a
>>>>> bit of
>>>>>>> a
>>>>>>>>> religious debate in the abstract, it did make me wonder whether
>>> it
>>>>>>> makes
>>>>>>>>> sense in this specific case.  When is it actually not present?
>>> I
>>>>> was
>>>>>>>>> under
>>>>>>>>> the impression that we should always have access to it in
>>>>> process(),
>>>>>>> and
>>>>>>>>> that the concern about metadata being undefined was about having
>>>>>>> access
>>>>>>>>> to
>>>>>>>>> record metadata in the ProcessorContext held for use inside a
>>>>>>>>> Punctuator.
>>>>>>>>>
>>>>>>>>> If that's not the case and it is truly optional in process(), is
>>>>> there
>>>>>>> an
>>>>>>>>> opportunity for an alternate interface for the cases when we
>>>>> don't get
>>>>>>> it,
>>>>>>>>> rather than force the branching on implementers of the
>>> interface?
>>>>>>>>> Apologies if I've missed something, I took a look at the PR and
>>> I
>>>>>>> didn't
>>>>>>>>> see any spots where I thought it would be empty.  Perhaps an
>>>>> example
>>>>>>> of a
>>>>>>>>> Punctuator using (and not using) the new API would clear things
>>>>> up.
>>>>>>>>> Best,
>>>>>>>>> Paul
>>>>>>>>>
>>>>>>>>> On Tue, Sep 29, 2020 at 4:10 PM John Roesler <
>>> vvcep...@apache.org
>>>>>>> wrote:
>>>>>>>>>> Hello again, all,
>>>>>>>>>>
>>>>>>>>>> Thanks for the latest round of discussion. I've taken the
>>>>>>>>>> recent feedback and come up with an updated KIP that seems
>>>>>>>>>> actually quite a bit nicer than the prior proposal.
>>>>>>>>>>
>>>>>>>>>> The specific diff on the KIP is here:
>>>>>>>>>>
>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
>>>>>>>>>> These changes are implemented in this POC PR:
>>>>>>>>>> https://github.com/apache/kafka/pull/9346
>>>>>>>>>>
>>>>>>>>>> The basic idea is that, building on the recent conversaion,
>>>>>>>>>> we would transition away from the current API where we get
>>>>>>>>>> only key/value in the process() method and other "data"
>>>>>>>>>> comes in the ProcessorContext along with the "metadata".
>>>>>>>>>>
>>>>>>>>>> Instead, we formalize what is "data" and what is "metadata",
>>>>>>>>>> and pass it all in to the process method:
>>>>>>>>>> Processor#process(Record, Optional<RecordMetadata>)
>>>>>>>>>>
>>>>>>>>>> Also, you forward the whole data class instead of mutating
>>>>>>>>>> the ProcessorContext fields and also calling forward:
>>>>>>>>>> ProcessorContext#forward(Record)
>>>>>>>>>>
>>>>>>>>>> The Record class itself ships with methods like
>>>>>>>>>> record#withValue(NewV newValue)
>>>>>>>>>> that make a shallow copy of the input Record, enabling
>>>>>>>>>> Processors to safely handle the record without polluting the
>>>>>>>>>> context of their parents and siblings.
>>>>>>>>>>
>>>>>>>>>> This proposal has a number of key benefits:
>>>>>>>>>> * As we've discovered in KAFKA-9584, it's unsafe to mutate
>>>>>>>>>> the Headers via the ProcessorContext. This proposal offers a
>>>>>>>>>> way to safely forward changes only to downstream processors.
>>>>>>>>>> * The new API has symmetry (each processor's input is the
>>>>>>>>>> output of its parent processor)
>>>>>>>>>> * The API makes clear that the record metadata isn't always
>>>>>>>>>> defined (for example, in a punctuation, there is no current
>>>>>>>>>> topic/partition/offset)
>>>>>>>>>> * The API enables punctuators to forward well defined
>>>>>>>>>> headers downstream, which is currently not possible.
>>>>>>>>>>
>>>>>>>>>> Unless their are objections, I'll go ahead and re-finalize
>>>>>>>>>> this KIP and update that PR to a mergeable state.
>>>>>>>>>>
>>>>>>>>>> Thanks, all,
>>>>>>>>>> -John
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
>>>>>>>>>>> Interesting proposal. However, I am not totally convinced,
>>>>> because
>>>>>>> I see
>>>>>>>>>>> a fundamental difference between "data" and "metadata".
>>>>>>>>>>>
>>>>>>>>>>> Topic/partition/offset are "metadata" in the strong sense
>>> and
>>>>> they
>>>>>>> are
>>>>>>>>>>> immutable.
>>>>>>>>>>>
>>>>>>>>>>> On the other hand there is "primary" data like key and
>>> value,
>>>>> as
>>>>>>> well as
>>>>>>>>>>> "secondary" data like timestamp and headers. The issue seems
>>>>> that
>>>>>>> we
>>>>>>>>>>> treat "secondary data" more like metadata atm?
>>>>>>>>>>>
>>>>>>>>>>> Thus, promoting timestamp and headers into a first class
>>>>> citizen
>>>>>>> roll
>>>>>>>>>>> make sense to me (my original proposal about `RecordContext`
>>>>> would
>>>>>>> still
>>>>>>>>>>> fall short with this regard). However, putting both (data
>>> and
>>>>>>> metadata)
>>>>>>>>>>> into a `Record` abstraction might go too far?
>>>>>>>>>>>
>>>>>>>>>>> I am also a little bit concerned about `Record.copy()`
>>>>> because it
>>>>>>> might
>>>>>>>>>>> be a trap: Users might assume it does a full deep copy of
>>> the
>>>>>>> record,
>>>>>>>>>>> however, it would not. It would only create a new `Record`
>>>>> object
>>>>>>> as
>>>>>>>>>>> wrapper that points to the same key/value/header objects as
>>>>> the
>>>>>>> input
>>>>>>>>>>> record.
>>>>>>>>>>>
>>>>>>>>>>> With the current `context.forward(key, value)` we don't have
>>>>> this
>>>>>>> "deep
>>>>>>>>>>> copy" issue -- it's pretty clear what is happening.
>>>>>>>>>>>
>>>>>>>>>>> Instead of `To.all().withTimestamp()` we could also add
>>>>>>>>>>> `context.forward(key, value, timestamp)` etc (just wondering
>>>>> about
>>>>>>> the
>>>>>>>>>>> exposition in overload)?
>>>>>>>>>>>
>>>>>>>>>>> Also, `Record.withValue` etc sounds odd? Should a record not
>>>>> be
>>>>>>>>>>> immutable? So, we could have something like
>>>>>>>>>>>
>>>>>>>>>>>
>>> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
>>>>>>>>>>> But it looks rather verbose?
>>>>>>>>>>>
>>>>>>>>>>> The other question is of course, to what extend to we want
>>> to
>>>>> keep
>>>>>>> the
>>>>>>>>>>> distinction between "primary" and "secondary" data? To me,
>>>>> it's a
>>>>>>>>>>> question of easy of use?
>>>>>>>>>>>
>>>>>>>>>>> Just putting all this out to move the discussion forward.
>>>>> Don't
>>>>>>> have a
>>>>>>>>>>> concrete proposal atm.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 9/14/20 9:24 AM, John Roesler wrote:
>>>>>>>>>>>> Thanks for this thought, Matthias!
>>>>>>>>>>>>
>>>>>>>>>>>> To be honest, it's bugged me quite a bit that _all_ the
>>>>>>>>>>>> record information hasn't been an argument to `process`. I
>>>>>>>>>>>> suppose I was trying to be conservative in this proposal,
>>>>>>>>>>>> but then again, if we're adding new Processor and
>>>>>>>>>>>> ProcessorContext interfaces, then this is the time to make
>>>>>>>>>>>> such a change.
>>>>>>>>>>>>
>>>>>>>>>>>> To be unambiguous, I think this is what we're talking
>>> about:
>>>>>>>>>>>> ProcessorContext:
>>>>>>>>>>>> * applicationId
>>>>>>>>>>>> * taskId
>>>>>>>>>>>> * appConfigs
>>>>>>>>>>>> * appConfigsWithPrefix
>>>>>>>>>>>> * keySerde
>>>>>>>>>>>> * valueSerde
>>>>>>>>>>>> * stateDir
>>>>>>>>>>>> * metrics
>>>>>>>>>>>> * schedule
>>>>>>>>>>>> * commit
>>>>>>>>>>>> * forward
>>>>>>>>>>>>
>>>>>>>>>>>> StateStoreContext:
>>>>>>>>>>>> * applicationId
>>>>>>>>>>>> * taskId
>>>>>>>>>>>> * appConfigs
>>>>>>>>>>>> * appConfigsWithPrefix
>>>>>>>>>>>> * keySerde
>>>>>>>>>>>> * valueSerde
>>>>>>>>>>>> * stateDir
>>>>>>>>>>>> * metrics
>>>>>>>>>>>> * register
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> RecordContext
>>>>>>>>>>>> * topic
>>>>>>>>>>>> * partition
>>>>>>>>>>>> * offset
>>>>>>>>>>>> * timestamp
>>>>>>>>>>>> * headers
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Your proposal sounds good to me as-is. Just to cover the
>>>>>>>>>>>> bases, though, I'm wondering if we should push the idea
>>> just
>>>>>>>>>>>> a little farther. Instead of decomposing
>>> key,value,context,
>>>>>>>>>>>> we could just keep them all in one object, like this:
>>>>>>>>>>>>
>>>>>>>>>>>> Record:
>>>>>>>>>>>> * key
>>>>>>>>>>>> * value
>>>>>>>>>>>> * topic
>>>>>>>>>>>> * partition
>>>>>>>>>>>> * offset
>>>>>>>>>>>> * timestamp
>>>>>>>>>>>> * headers
>>>>>>>>>>>>
>>>>>>>>>>>> Then, we could have:
>>>>>>>>>>>> Processor#process(Record)
>>>>>>>>>>>> ProcessorContext#forward(Record, To)
>>>>>>>>>>>>
>>>>>>>>>>>> Viewed from this perspective, a record has three
>>> properties
>>>>>>>>>>>> that people may specify in their processors: key, value,
>>> and
>>>>>>>>>>>> timestamp.
>>>>>>>>>>>>
>>>>>>>>>>>> We could deprecate `To#withTimestamp` and enable people to
>>>>>>>>>>>> specify the timestamp along with the key and value when
>>> they
>>>>>>>>>>>> forward a record.
>>>>>>>>>>>>
>>>>>>>>>>>> E.g.,
>>>>>>>>>>>> RecordBuilder toForward = RecordBuilder.copy(record)
>>>>>>>>>>>> toForward.withKey(newKey)
>>>>>>>>>>>> toForward.withValue(newValue)
>>>>>>>>>>>> toForward.withTimestamp(newTimestamp)
>>>>>>>>>>>> Record newRecord = toForward.build()
>>>>>>>>>>>> context.forward(newRecord, To.child("child1"))
>>>>>>>>>>>>
>>>>>>>>>>>> Or, the more compact common case:
>>>>>>>>>>>> current:
>>>>>>>>>>>>  context.forward(key, "newValue")
>>>>>>>>>>>> proposed:
>>>>>>>>>>>>
>>> context.forward(copy(record).withValue("newValue").build())
>>>>>>>>>>>>
>>>>>>>>>>>> It's slightly more verbose, but also more extensible. This
>>>>>>>>>>>> would give us a clean path to add header support in PAPI
>>> as
>>>>>>>>>>>> well, simply by adding `withHeaders` in RecordBuilder.
>>>>>>>>>>>>
>>>>>>>>>>>> It's also more symmetrical, since the recipient of
>>> `forward`
>>>>>>>>>>>> would just get the sent `Record`. Whereas today, the
>>> sender
>>>>>>>>>>>> puts the timestamp in `To`, but the recipient gets in in
>>> its
>>>>>>>>>>>> own `ProcessorContext`.
>>>>>>>>>>>>
>>>>>>>>>>>> WDYT?
>>>>>>>>>>>> -John
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
>>>>>>>>>>>>> I think separating the different contexts make sense.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In fact, we could even go one step further and remove
>>> the
>>>>>>> record
>>>>>>>>>> context
>>>>>>>>>>>>> from the processor context completely and we add a third
>>>>>>> parameter to
>>>>>>>>>>>>> `process(key, value, recordContext)`. This would make it
>>>>> clear
>>>>>>> that
>>>>>>>>>> the
>>>>>>>>>>>>> context is for the input record only and it's not
>>>>> possible to
>>>>>>> pass
>>>>>>>>>> it to
>>>>>>>>>>>>> a `punctuate` callback.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the stores and changelogging: I think there are two
>>>>> cases.
>>>>>>> (1)
>>>>>>>>>> You
>>>>>>>>>>>>> use a plain key-value store. For this case, it seems you
>>>>> do
>>>>>>> not care
>>>>>>>>>>>>> about the timestamp and thus does not care what
>>> timestamp
>>>>> is
>>>>>>> set in
>>>>>>>>>> the
>>>>>>>>>>>>> changelog records. (We can set anything we want, as it's
>>>>> not
>>>>>>>>>> relevant at
>>>>>>>>>>>>> all -- the timestamp is ignored on read anyway.) (2) The
>>>>> other
>>>>>>> case
>>>>>>>>>> is,
>>>>>>>>>>>>> that one does care about timestamps, and for this case
>>>>> should
>>>>>>> use
>>>>>>>>>>>>> TimestampedKeyValueStore. The passed timestamp will be
>>>>> set on
>>>>>>> the
>>>>>>>>>>>>> changelog records for this case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thus, for both cases, accessing the record context does
>>>>> not
>>>>>>> seems to
>>>>>>>>>> be
>>>>>>>>>>>>> a requirement. And providing access to the processor
>>>>> context
>>>>>>> to, eg.,
>>>>>>>>>>>>> `forward()` or similar seems safe.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 9/10/20 7:25 PM, John Roesler wrote:
>>>>>>>>>>>>>> Thanks for the reply, Paul!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I certainly intend to make sure that the changelogging
>>>>> layer
>>>>>>>>>>>>>> continues to work the way it does now, by hook or by
>>>>> crook.
>>>>>>>>>>>>>> I think the easiest path for me is to just "cheat" and
>>>>> get
>>>>>>>>>>>>>> the real ProcessorContext into the ChangeLoggingStore
>>>>>>>>>>>>>> implementation somehow. I'll tag you on the PR when I
>>>>> create
>>>>>>>>>>>>>> it, so you have an opportunity to express a preference
>>>>> about
>>>>>>>>>>>>>> the implementation choice, and maybe even compile/test
>>>>>>>>>>>>>> against it to make sure your stuff still works.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> we have an interest in making a state store with a
>>>>> richer
>>>>>>>>>>>>>>> way of querying its data (like perhaps getting all
>>>>> values
>>>>>>>>>>>>>>> associated with a secondary key), while still
>>>>> ultimately
>>>>>>>>>>>>>>> writing to the changelog topic for later
>>> restoration.
>>>>>>>>>>>>>> This is very intriguing to me. On the side, I've been
>>>>>>>>>>>>>> preparing a couple of ideas related to this topic. I
>>>>> don't
>>>>>>>>>>>>>> think I have a coherent enough thought to even express
>>>>> it in
>>>>>>>>>>>>>> a Jira right now, but when I do, I'll tag you on it
>>>>> also to
>>>>>>>>>>>>>> see what you think.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Whenever you're ready to share the usability
>>> improvement
>>>>>>>>>>>>>> ideas, I'm very interested to see what you've come up
>>>>> with.
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
>>>>>>>>>>>>>>>> when you use a HashMap or RocksDB or other "state
>>>>>>> stores", you
>>>>>>>>>> don't
>>>>>>>>>>>>>>>> expect them to automatically know extra stuff
>>> about
>>>>> the
>>>>>>> record
>>>>>>>>>> you're
>>>>>>>>>>>>>>>> storing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So, I don't think there is any reason we *can't*
>>>>> retain the
>>>>>>>>>> record context
>>>>>>>>>>>>>>>> in the StateStoreContext, and if any users came
>>>>> along
>>>>>>> with a
>>>>>>>>>> clear use case
>>>>>>>>>>>>>>>> I'd find that convincing.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I agree with the principle of being conservative
>>> with
>>>>> the
>>>>>>>>>> StateStoreContext
>>>>>>>>>>>>>>> API.  Regarding user expectations or a clear use
>>>>> case, the
>>>>>>> only
>>>>>>>>>>>>>>> counterpoint I would offer is that we sort of have
>>>>> that
>>>>>>> use case
>>>>>>>>>> already,
>>>>>>>>>>>>>>> which is the example I gave of the change logging
>>>>> store
>>>>>>> using the
>>>>>>>>>>>>>>> timestamp.  I am curious if this functionality will
>>> be
>>>>>>> retained
>>>>>>>>>> when using
>>>>>>>>>>>>>>> built in state stores, or will a low-level processor
>>>>> get a
>>>>>>>>>> KeyValueStore
>>>>>>>>>>>>>>> that no longer writes to the changelog topic with
>>> the
>>>>>>> record's
>>>>>>>>>> timestamp.
>>>>>>>>>>>>>>> While I personally don't care much about that
>>>>> functionality
>>>>>>>>>> specifically, I
>>>>>>>>>>>>>>> have a general desire for custom state stores to
>>>>> easily do
>>>>>>> the
>>>>>>>>>> things that
>>>>>>>>>>>>>>> built in state stores do.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It genuinely did not occur to me that users might be
>>>>>>> looking up
>>>>>>>>>> and/or
>>>>>>>>>>>>>>>> updating records of other keys from within a
>>>>> Processor.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm glad you said this Sophie, because it gives me
>>> an
>>>>>>>>>> opportunity to say
>>>>>>>>>>>>>>> that this is actually a *huge* use case for my team.
>>>>> The
>>>>>>> state
>>>>>>>>>> store
>>>>>>>>>>>>>>> usability improvements I was referring to in my
>>>>> previous
>>>>>>> message
>>>>>>>>>> were about
>>>>>>>>>>>>>>> enabling the user to write custom stores while still
>>>>> easily
>>>>>>>>>> hooking into
>>>>>>>>>>>>>>> the ability to write to a changelog topic.  I think
>>>>> that is
>>>>>>>>>> technically
>>>>>>>>>>>>>>> possible now, but I don't think it's trivial.
>>>>>>> Specifically, we
>>>>>>>>>> have an
>>>>>>>>>>>>>>> interest in making a state store with a richer way
>>> of
>>>>>>> querying
>>>>>>>>>> its data
>>>>>>>>>>>>>>> (like perhaps getting all values associated with a
>>>>>>> secondary
>>>>>>>>>> key), while
>>>>>>>>>>>>>>> still ultimately writing to the changelog topic for
>>>>> later
>>>>>>>>>> restoration.
>>>>>>>>>>>>>>> We recognize that this use case throws away some of
>>>>> what
>>>>>>> kafka
>>>>>>>>>> streams
>>>>>>>>>>>>>>> (especially the DSL) is good at - easy
>>>>> parallelizability by
>>>>>>>>>> partitioning
>>>>>>>>>>>>>>> all processing by key - and that our business logic
>>>>> would
>>>>>>>>>> completely fall
>>>>>>>>>>>>>>> apart if we were consuming from multi-partition
>>>>> topics with
>>>>>>>>>> multiple
>>>>>>>>>>>>>>> consumers.  But we have found that using the low
>>> level
>>>>>>> processor
>>>>>>>>>> API is
>>>>>>>>>>>>>>> good for the very simple stream processing
>>> primitives
>>>>> it
>>>>>>>>>> provides: handling
>>>>>>>>>>>>>>> the plumbing of consuming from multiple kafka topics
>>>>> and
>>>>>>>>>> potentially
>>>>>>>>>>>>>>> updating persistent local state in a reliable way.
>>>>> That in
>>>>>>>>>> itself has
>>>>>>>>>>>>>>> proven to be a worthwhile programming model.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Since I got off track a bit, let me summarize: I
>>> don't
>>>>>>>>>> particularly care
>>>>>>>>>>>>>>> about the record context being available to state
>>>>> store
>>>>>>>>>> implementations,
>>>>>>>>>>>>>>> and I think this KIP is headed in the right
>>> direction
>>>>> in
>>>>>>> that
>>>>>>>>>> regard.  But
>>>>>>>>>>>>>>> more generally, I wanted to express the importance
>>> of
>>>>>>>>>> maintaining a
>>>>>>>>>>>>>>> powerful and flexible StateStore interface.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman
>>> <
>>>>>>>>>> sop...@confluent.io>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Aha, I did misinterpret the example in your
>>> previous
>>>>>>> response
>>>>>>>>>> regarding the
>>>>>>>>>>>>>>>> range query after all. I thought you just meant a
>>>>>>> time-range
>>>>>>>>>> query inside a
>>>>>>>>>>>>>>>> punctuator. It genuinely did not occur to me that
>>>>> users
>>>>>>> might
>>>>>>>>>> be looking up
>>>>>>>>>>>>>>>> and/or updating records of other keys from within
>>> a
>>>>>>> Processor.
>>>>>>>>>> Sorry for
>>>>>>>>>>>>>>>> being closed minded
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I won't drag out this discussion any further by
>>>>> asking
>>>>>>> whether
>>>>>>>>>> that might
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> a valid use case or just a lurking bug in itself
>>> :)
>>>>>>>>>>>>>>>> Thanks for humoring me. The current proposal for
>>>>> KIP-478
>>>>>>>>>> sounds good to me
>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:43 PM John Roesler <
>>>>>>>>>> vvcep...@apache.org> wrote:
>>>>>>>>>>>>>>>>> Ah, thanks Sophie,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm sorry for misinterpreting your resonse. Yes,
>>>>> we
>>>>>>>>>>>>>>>>> absolutely can and should clear the context
>>> before
>>>>>>>>>>>>>>>>> punctuating.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My secondary concern is maybe more far-fetched.
>>> I
>>>>> was
>>>>>>>>>>>>>>>>> thinking that inside process(key,value), a
>>>>> Processor
>>>>>>> might
>>>>>>>>>>>>>>>>> do a get/put of a _different_ key. Consider, for
>>>>>>> example,
>>>>>>>>>>>>>>>>> the way that Suppress processors work. When they
>>>>> get a
>>>>>>>>>>>>>>>>> record, they add it to the store and then do a
>>>>> range
>>>>>>> scan
>>>>>>>>>>>>>>>>> and possibly forward a _different_ record. Of
>>>>> course,
>>>>>>> this
>>>>>>>>>>>>>>>>> is an operation that is deeply coupled to the
>>>>>>> internals, and
>>>>>>>>>>>>>>>>> the Suppress processor accordingly actually does
>>>>> get
>>>>>>> access
>>>>>>>>>>>>>>>>> to the internal context so that it can set the
>>>>> context
>>>>>>>>>>>>>>>>> before forwarding.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Still, it seems like I've had a handful of
>>>>>>> conversations
>>>>>>>>>>>>>>>>> with people over the years in which they tell me
>>>>> they
>>>>>>> are
>>>>>>>>>>>>>>>>> using state stores in a way that transcends the
>>>>> "get
>>>>>>> and put
>>>>>>>>>>>>>>>>> the currently processing record" access
>>> pattern. I
>>>>>>> doubt
>>>>>>>>>>>>>>>>> that those folks would even have considered the
>>>>>>> possiblity
>>>>>>>>>>>>>>>>> that the currently processing record's _context_
>>>>> could
>>>>>>>>>>>>>>>>> pollute their state store operations, as I
>>> myself
>>>>>>> never gave
>>>>>>>>>>>>>>>>> it a second thought until the current
>>> conversation
>>>>>>> began. In
>>>>>>>>>>>>>>>>> cases like that, we have actually set a trap for
>>>>> these
>>>>>>>>>>>>>>>>> people, and it seems better to dismantle the
>>> trap.
>>>>>>>>>>>>>>>>> As you noted, really the only people who would
>>> be
>>>>>>> negatively
>>>>>>>>>>>>>>>>> impacted are people who implement their own
>>> state
>>>>>>> stores.
>>>>>>>>>>>>>>>>> These folks will get the deprecation warning and
>>>>> try to
>>>>>>>>>>>>>>>>> adapt their stores to the new interface. If they
>>>>> needed
>>>>>>>>>>>>>>>>> access to the record context, they would find
>>>>> it's now
>>>>>>>>>>>>>>>>> missing. They'd ask us about it, and we'd have
>>> the
>>>>>>> ability
>>>>>>>>>>>>>>>>> to explain the lurking bug that they have had in
>>>>> their
>>>>>>>>>>>>>>>>> stores all along, as well as the new recommended
>>>>>>> pattern
>>>>>>>>>>>>>>>>> (just pass everything you need in the value). If
>>>>> that's
>>>>>>>>>>>>>>>>> unsatisfying, _then_ we should consider amending
>>>>> the
>>>>>>> API.
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 15:21 -0700, Sophie
>>>>> Blee-Goldman
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the
>>>>> processor
>>>>>>> would
>>>>>>>>>> null
>>>>>>>>>>>>>>>>>>> out the record context...", this is not
>>>>> possible,
>>>>>>> since
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> processor doesn't have write access to the
>>>>>>> context. We
>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>> add it,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Sorry, this was poorly phrased, I definitely
>>>>> did not
>>>>>>> mean
>>>>>>>>>> to imply that
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> should make the context modifiable by the
>>>>> Processors
>>>>>>>>>> themselves. I
>>>>>>>>>>>>>>>> meant
>>>>>>>>>>>>>>>>>> this should be handled by the internal
>>>>> processing
>>>>>>>>>> framework that deals
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> passing records from one Processor to the
>>> next,
>>>>>>> setting
>>>>>>>>>> the record
>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>> when a new record is picked up, invoking the
>>>>>>> punctuators,
>>>>>>>>>> etc. I
>>>>>>>>>>>>>>>> believe
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> all currently happens in the StreamTask? It
>>>>> already
>>>>>>> can
>>>>>>>>>> and does
>>>>>>>>>>>>>>>>> overwrite
>>>>>>>>>>>>>>>>>> the record context as new records are
>>>>> processed, and
>>>>>>> is
>>>>>>>>>> also
>>>>>>>>>>>>>>>> responsible
>>>>>>>>>>>>>>>>>> for calling the punctuators, so it doesn't
>>> seem
>>>>> like
>>>>>>> a
>>>>>>>>>> huge leap to
>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>> say
>>>>>>>>>>>>>>>>>> "null out the current record before
>>> punctuating"
>>>>>>>>>>>>>>>>>> To clarify, I was never advocating or even
>>>>>>> considering to
>>>>>>>>>> give the
>>>>>>>>>>>>>>>>>> Processors
>>>>>>>>>>>>>>>>>> write access to the record context. Sorry if
>>> my
>>>>> last
>>>>>>>>>> message (or all of
>>>>>>>>>>>>>>>>>> them)
>>>>>>>>>>>>>>>>>> was misleading. I just wanted to point out
>>> that
>>>>> the
>>>>>>>>>> punctuator concern
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> orthogonal to the question of whether we
>>> should
>>>>>>> include
>>>>>>>>>> the record
>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>> in the StateStoreContext. It's definitely a
>>> real
>>>>>>> problem,
>>>>>>>>>> but it's a
>>>>>>>>>>>>>>>>>> problem
>>>>>>>>>>>>>>>>>> that exists at the Processor level and not
>>> just
>>>>> the
>>>>>>>>>> StateStore.
>>>>>>>>>>>>>>>>>> So, I don't think there is any reason we
>>> *can't*
>>>>>>> retain
>>>>>>>>>> the record
>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>> StateStoreContext, and if any users came along
>>>>> with a
>>>>>>>>>> clear use case
>>>>>>>>>>>>>>>> I'd
>>>>>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>> that convincing. In the absence of any
>>>>> examples, the
>>>>>>>>>> conservative
>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>> sounds good to me.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If it turns out that someone did need the
>>> record
>>>>>>> context
>>>>>>>>>> in their
>>>>>>>>>>>>>>>> custom
>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>> store, I'm sure they'll submit a politely
>>>>> worded bug
>>>>>>>>>> report alerting us
>>>>>>>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>>> broke their application.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:05 PM John Roesler <
>>>>>>>>>> vvcep...@apache.org>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Thanks, Sophie,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes, now that you point it out, I can see
>>>>> that the
>>>>>>> record
>>>>>>>>>>>>>>>>>>> context itself should be nulled out by
>>> Streams
>>>>>>> before
>>>>>>>>>>>>>>>>>>> invoking punctuators. From that perspective,
>>>>> we
>>>>>>> don't
>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>> to think about the second-order problem of
>>>>> what's
>>>>>>> in the
>>>>>>>>>>>>>>>>>>> context for the state store when called
>>> from a
>>>>>>>>>> punctuator.
>>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the
>>>>> processor
>>>>>>> would
>>>>>>>>>> null
>>>>>>>>>>>>>>>>>>> out the record context...", this is not
>>>>> possible,
>>>>>>> since
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> processor doesn't have write access to the
>>>>>>> context. We
>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>> add it, but then all kinds of strange
>>> effects
>>>>>>> would ensue
>>>>>>>>>>>>>>>>>>> when downstream processors execute but the
>>>>> context
>>>>>>> is
>>>>>>>>>> empty,
>>>>>>>>>>>>>>>>>>> etc. Better to just let the framework manage
>>>>> the
>>>>>>> record
>>>>>>>>>>>>>>>>>>> context and keep it read-only for
>>> Processors.
>>>>>>>>>>>>>>>>>>> Reading between the lines of your last
>>> reply,
>>>>> it
>>>>>>> sounds
>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the disconnect may just have been a mutual
>>>>>>>>>> misunderstanding
>>>>>>>>>>>>>>>>>>> about whether or not Processors currently
>>> have
>>>>>>> access to
>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>> the record context. Since they do not, if we
>>>>>>> wanted to
>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>> the record context to StateStoreContext in a
>>>>>>> well-defined
>>>>>>>>>>>>>>>>>>> way, we'd also have to add the ability for
>>>>>>> Processors to
>>>>>>>>>>>>>>>>>>> manipulate it. But then, we're just
>>> creating a
>>>>>>>>>> side-channel
>>>>>>>>>>>>>>>>>>> for Processors to pass some information in
>>>>>>> arguments to
>>>>>>>>>>>>>>>>>>> "put()" and other information implicitly
>>>>> through
>>>>>>> the
>>>>>>>>>>>>>>>>>>> context. It seems better just to go for a
>>>>> single
>>>>>>> channel
>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It sounds like you're basically in favor of
>>>>> the
>>>>>>>>>> conservative
>>>>>>>>>>>>>>>>>>> approach, and you just wanted to understand
>>>>> the
>>>>>>> blockers
>>>>>>>>>>>>>>>>>>> that I implied. Does my clarification make
>>>>> sense?
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 10:54 -0700, Sophie
>>>>>>> Blee-Goldman
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> I was just thinking that the processor
>>> would
>>>>>>> null out
>>>>>>>>>> the record
>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>> after it
>>>>>>>>>>>>>>>>>>>> finished processing the record, so I'm not
>>>>> sure I
>>>>>>>>>> follow why this
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> possible? AFAIK we never call a punctuator
>>>>> in the
>>>>>>>>>> middle of
>>>>>>>>>>>>>>>>> processing a
>>>>>>>>>>>>>>>>>>>> record through the topology, and even if
>>> we
>>>>> did,
>>>>>>> we
>>>>>>>>>> still know when
>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>> to be called and could set it to null
>>>>> beforehand.
>>>>>>>>>>>>>>>>>>>> I'm not trying to advocate for it here,
>>> I'm
>>>>> in
>>>>>>>>>> agreement that
>>>>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>> to access within the store can and should
>>> be
>>>>>>> accessed
>>>>>>>>>> within the
>>>>>>>>>>>>>>>>> calling
>>>>>>>>>>>>>>>>>>>> Processor/Punctuator before reaching the
>>>>> store.
>>>>>>> The
>>>>>>>>>> "we can always
>>>>>>>>>>>>>>>>> add it
>>>>>>>>>>>>>>>>>>>> later if necessary" argument is also
>>> pretty
>>>>>>>>>> convincing. Just trying
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>>>> why this wouldn't be possible.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> FWIW, the question of "what is the current
>>>>>>> record in
>>>>>>>>>> the context
>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>> Punctuator"
>>>>>>>>>>>>>>>>>>>> exists independently of whether we want to
>>>>> add
>>>>>>> this to
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> StateStoreContext
>>>>>>>>>>>>>>>>>>>> or not. The full ProcessorContext,
>>>>> including the
>>>>>>>>>> current record
>>>>>>>>>>>>>>>>> context,
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> already available within a Punctuator, so
>>>>>>> removing the
>>>>>>>>>> current
>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>> from the StateStoreContext does not solve
>>>>> the
>>>>>>> problem.
>>>>>>>>>> Users can --
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> (see KAFKA-9584 <
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9584
>>>>>>>>>>>>>>>>> ;;)
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> hit
>>>>>>>>>>>>>>>>>>>> such subtle bugs without ever invoking a
>>>>>>> StateStore
>>>>>>>>>>>>>>>>>>>> from their punctuator.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Again, I think I do agree that we should
>>>>> leave
>>>>>>> the
>>>>>>>>>> current record
>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>> off of
>>>>>>>>>>>>>>>>>>>> the StateStoreContext, but I don't think
>>> the
>>>>>>>>>> Punctuator argument
>>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> very convincing. It sounds to me like we
>>>>> need to
>>>>>>>>>> disallow access to
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> record context from within the Punctuator,
>>>>>>> independent
>>>>>>>>>> of anything
>>>>>>>>>>>>>>>>> to do
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> state stores
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 7:12 AM John
>>>>> Roesler <
>>>>>>>>>> vvcep...@apache.org>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> Thanks for the thoughts, Sophie.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I agree that the extra information could
>>>>> be
>>>>>>> useful.
>>>>>>>>>> My only
>>>>>>>>>>>>>>>>> concern is
>>>>>>>>>>>>>>>>>>>>> that it doesn’t seem like we can
>>> actually
>>>>>>> supply
>>>>>>>>>> that extra
>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>> correctly. So, then we have a situation
>>>>> where
>>>>>>> the
>>>>>>>>>> system offers
>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>> calls that are only correct in a narrow
>>>>> range
>>>>>>> of use
>>>>>>>>>> cases.
>>>>>>>>>>>>>>>>> Outside of
>>>>>>>>>>>>>>>>>>>>> those use cases, you get incorrect
>>>>> behavior.
>>>>>>>>>>>>>>>>>>>>> If it were possible to null out the
>>>>> context
>>>>>>> before
>>>>>>>>>> you put a
>>>>>>>>>>>>>>>>> document
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> which the context doesn’t apply, then
>>> the
>>>>>>> concern
>>>>>>>>>> would be
>>>>>>>>>>>>>>>>> mitigated.
>>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>>> it would still be pretty weird from the
>>>>>>> perspective
>>>>>>>>>> of the store
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> sometimes the context is populated and
>>>>> other
>>>>>>> times,
>>>>>>>>>> it’s null.
>>>>>>>>>>>>>>>>>>>>> But that seems moot, since it doesn’t
>>> seem
>>>>>>> possible
>>>>>>>>>> to null out
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> context. Only the Processor could know
>>>>> whether
>>>>>>> it’s
>>>>>>>>>> about to put
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> document
>>>>>>>>>>>>>>>>>>>>> different from the context or not. And
>>> it
>>>>>>> would be
>>>>>>>>>> inappropriate
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> offer a
>>>>>>>>>>>>>>>>>>>>> public ProcessorContext api to manage
>>> the
>>>>>>> record
>>>>>>>>>> context.
>>>>>>>>>>>>>>>>>>>>> Ultimately, it still seems like if you
>>>>> want to
>>>>>>> store
>>>>>>>>>> headers, you
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>> store them explicitly, right? That
>>>>> doesn’t seem
>>>>>>>>>> onerous to me,
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> kind
>>>>>>>>>>>>>>>>>>>>> of seems better than relying on
>>> undefined
>>>>> or
>>>>>>>>>> asymmetrical
>>>>>>>>>>>>>>>> behavior
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> store itself.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Anyway, I’m not saying that we couldn’t
>>>>> solve
>>>>>>> these
>>>>>>>>>> problems.
>>>>>>>>>>>>>>>> Just
>>>>>>>>>>>>>>>>>>> that it
>>>>>>>>>>>>>>>>>>>>> seems a little that we can be
>>>>> conservative and
>>>>>>> avoid
>>>>>>>>>> them for
>>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> turns out we really need to solve them,
>>>>> we can
>>>>>>>>>> always do it
>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> John
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020, at 22:46, Sophie
>>>>>>> Blee-Goldman
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a
>>>>>>> punctuator, or
>>>>>>>>>> do a
>>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one
>>> of
>>>>>>> those
>>>>>>>>>> records with
>>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle
>>> bug
>>>>> on
>>>>>>> your
>>>>>>>>>> hands.
>>>>>>>>>>>>>>>>>>>>>> Can you elaborate on this a bit? I
>>> agree
>>>>>>> that the
>>>>>>>>>> punctuator
>>>>>>>>>>>>>>>>> case is
>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>> obvious exemption to the assumption
>>> that
>>>>>>> store
>>>>>>>>>> invocations
>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>> have a corresponding "current record",
>>>>> but I
>>>>>>> don't
>>>>>>>>>> understand
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> second example. Are you envisioning a
>>>>>>> scenario
>>>>>>>>>> where the
>>>>>>>>>>>>>>>> #process
>>>>>>>>>>>>>>>>>>>>>> method performs a range query and then
>>>>>>> updates
>>>>>>>>>> records? Or were
>>>>>>>>>>>>>>>>>>>>>> you just giving another example of the
>>>>>>> punctuator
>>>>>>>>>> case?
>>>>>>>>>>>>>>>>>>>>>> I only bring it up because I agree
>>> that
>>>>> the
>>>>>>>>>> current record
>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>> still be useful within the context of
>>>>> the
>>>>>>> store.
>>>>>>>>>> As a non-user
>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>> input
>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> definitely has limited value, but it
>>>>> just
>>>>>>> isn't
>>>>>>>>>> striking me as
>>>>>>>>>>>>>>>>>>> obvious
>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> should remove access to the current
>>>>> record
>>>>>>> context
>>>>>>>>>> from the
>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>> stores.
>>>>>>>>>>>>>>>>>>>>>> If there is no current record, as in
>>> the
>>>>>>>>>> punctuator case, we
>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>>>>> the record context to null (or
>>>>>>> Optional.empty,
>>>>>>>>>> etc).
>>>>>>>>>>>>>>>>>>>>>> That said, the put() always has to
>>> come
>>>>> from
>>>>>>>>>> somewhere, and
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> somewhere is always going to be
>>> either a
>>>>>>> Processor
>>>>>>>>>> or a
>>>>>>>>>>>>>>>>> Punctuator,
>>>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>> of which will still have access to the
>>>>> full
>>>>>>>>>> context. So
>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>> info
>>>>>>>>>>>>>>>>>>>>>> such as
>>>>>>>>>>>>>>>>>>>>>> the timestamp can and should probably
>>> be
>>>>>>> supplied
>>>>>>>>>> to the store
>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>> calling put(), rather than looked up
>>> by
>>>>> the
>>>>>>> store.
>>>>>>>>>> But I can
>>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>>> things being useful, for example the
>>>>> current
>>>>>>>>>> record's headers.
>>>>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>>>>>>>>> if/when
>>>>>>>>>>>>>>>>>>>>>> we add better (or any) support for
>>>>> headers in
>>>>>>>>>> state stores this
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> less true.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Of course as John has made clear, it's
>>>>>>> pretty hard
>>>>>>>>>> to judge
>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>>>>> examples
>>>>>>>>>>>>>>>>>>>>>> and more insight as to what actually
>>>>> goes on
>>>>>>>>>> within a custom
>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 8:07 PM John
>>>>> Roesler <
>>>>>>>>>>>>>>>> vvcep...@apache.org
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> Hi Paul,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> It's good to hear from you!
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I'm glad you're in favor of the
>>>>> direction.
>>>>>>>>>> Especially when
>>>>>>>>>>>>>>>>>>>>>>> it comes to public API and usability
>>>>>>> concens, I
>>>>>>>>>> tend to
>>>>>>>>>>>>>>>>>>>>>>> think that "the folks who matter"
>>> are
>>>>>>> actually
>>>>>>>>>> the folks who
>>>>>>>>>>>>>>>>>>>>>>> have to use the APIs to accomplish
>>>>> real
>>>>>>> tasks.
>>>>>>>>>> It can be
>>>>>>>>>>>>>>>>>>>>>>> hard for me to be sure I'm thinking
>>>>>>> clearly from
>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> perspective.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Funny story, I also started down
>>> this
>>>>> road
>>>>>>> a
>>>>>>>>>> couple of times
>>>>>>>>>>>>>>>>>>>>>>> already and backed them out before
>>>>> the KIP
>>>>>>>>>> because I was
>>>>>>>>>>>>>>>>>>>>>>> afraid of the scope of the proposal.
>>>>>>>>>> Unfortunately, needing
>>>>>>>>>>>>>>>>>>>>>>> to make a new ProcessorContext kind
>>> of
>>>>>>> forced my
>>>>>>>>>> hand.
>>>>>>>>>>>>>>>>>>>>>>> I see you've called me out about the
>>>>>>>>>> ChangeLogging stores :)
>>>>>>>>>>>>>>>>>>>>>>> In fact, I think these are the
>>>>> main/only
>>>>>>> reason
>>>>>>>>>> that stores
>>>>>>>>>>>>>>>>>>>>>>> might really need to invoke
>>>>> "forward()". My
>>>>>>>>>> secret plan was
>>>>>>>>>>>>>>>>>>>>>>> to cheat and either accomplish
>>>>>>> change-logging by
>>>>>>>>>> a different
>>>>>>>>>>>>>>>>>>>>>>> mechanism than implementing the
>>> store
>>>>>>> interface,
>>>>>>>>>> or by just
>>>>>>>>>>>>>>>>>>>>>>> breaking encapsulation to sneak the
>>>>> "real"
>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>>>>>>>> into the ChangeLogging stores. But
>>>>> those
>>>>>>> are all
>>>>>>>>>>>>>>>>>>>>>>> implementation details. I think the
>>>>> key
>>>>>>> question
>>>>>>>>>> is whether
>>>>>>>>>>>>>>>>>>>>>>> anyone else has a store
>>>>> implementation that
>>>>>>>>>> needs to call
>>>>>>>>>>>>>>>>>>>>>>> "forward()". It's not what you
>>>>> mentioned,
>>>>>>> but
>>>>>>>>>> since you
>>>>>>>>>>>>>>>>>>>>>>> spoke up, I'll just ask: if you have
>>>>> a use
>>>>>>> case
>>>>>>>>>> for calling
>>>>>>>>>>>>>>>>>>>>>>> "forward()" in a store, please share
>>>>> it.
>>>>>>>>>>>>>>>>>>>>>>> Regarding the other record-specific
>>>>> context
>>>>>>>>>> methods, I think
>>>>>>>>>>>>>>>>>>>>>>> you have a good point, but I also
>>>>> can't
>>>>>>> quite
>>>>>>>>>> wrap my head
>>>>>>>>>>>>>>>>>>>>>>> around how we can actually guarantee
>>>>> it to
>>>>>>> work
>>>>>>>>>> in general.
>>>>>>>>>>>>>>>>>>>>>>> For example, the case you cited,
>>>>> where the
>>>>>>>>>> implementation of
>>>>>>>>>>>>>>>>>>>>>>> `KeyValueStore#put(key, value)` uses
>>>>> the
>>>>>>> context
>>>>>>>>>> to augment
>>>>>>>>>>>>>>>>>>>>>>> the record with timestamp
>>>>> information. This
>>>>>>>>>> relies on the
>>>>>>>>>>>>>>>>>>>>>>> assumption that you would only call
>>>>>>> "put()" from
>>>>>>>>>> inside a
>>>>>>>>>>>>>>>>>>>>>>> `Processor#process(key, value)` call
>>>>> in
>>>>>>> which
>>>>>>>>>> the record
>>>>>>>>>>>>>>>>>>>>>>> being processed is the same record
>>>>> that
>>>>>>> you're
>>>>>>>>>> trying to put
>>>>>>>>>>>>>>>>>>>>>>> into the store.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a
>>>>>>> punctuator, or
>>>>>>>>>> do a
>>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one
>>> of
>>>>>>> those
>>>>>>>>>> records with
>>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle
>>> bug
>>>>> on
>>>>>>> your
>>>>>>>>>> hands. Right
>>>>>>>>>>>>>>>>>>>>>>> now, the Streams component that
>>>>> actually
>>>>>>> calls
>>>>>>>>>> the Processor
>>>>>>>>>>>>>>>>>>>>>>> takes care to set the right record
>>>>> context
>>>>>>>>>> before invoking
>>>>>>>>>>>>>>>>>>>>>>> the method, and in the case of
>>>>> caching,
>>>>>>> etc., it
>>>>>>>>>> also takes
>>>>>>>>>>>>>>>>>>>>>>> care to swap out the old context and
>>>>> keep
>>>>>>> it
>>>>>>>>>> somewhere safe.
>>>>>>>>>>>>>>>>>>>>>>> But when it comes to public API
>>>>> Processors
>>>>>>>>>> calling methods
>>>>>>>>>>>>>>>>>>>>>>> on StateStores, there's no
>>>>> opportunity for
>>>>>>> any
>>>>>>>>>> component to
>>>>>>>>>>>>>>>>>>>>>>> make sure the context is always
>>>>> correct.
>>>>>>>>>>>>>>>>>>>>>>> In the face of that situation, it
>>>>> seemed
>>>>>>> better
>>>>>>>>>> to just move
>>>>>>>>>>>>>>>>>>>>>>> in the direction of a "normal" data
>>>>> store.
>>>>>>> I.e.,
>>>>>>>>>> when you
>>>>>>>>>>>>>>>>>>>>>>> use a HashMap or RocksDB or other
>>>>> "state
>>>>>>>>>> stores", you don't
>>>>>>>>>>>>>>>>>>>>>>> expect them to automatically know
>>>>> extra
>>>>>>> stuff
>>>>>>>>>> about the
>>>>>>>>>>>>>>>>>>>>>>> record you're storing. If you need
>>>>> them to
>>>>>>> know
>>>>>>>>>> something,
>>>>>>>>>>>>>>>>>>>>>>> you just put it in the value.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> All of that said, I'm just reasoning
>>>>> from
>>>>>>> first
>>>>>>>>>> principles
>>>>>>>>>>>>>>>>>>>>>>> here. To really know if this is a
>>>>> mistake
>>>>>>> or
>>>>>>>>>> not, I need to
>>>>>>>>>>>>>>>>>>>>>>> be in your place. So please push
>>> back
>>>>> if
>>>>>>> you
>>>>>>>>>> think what I
>>>>>>>>>>>>>>>>>>>>>>> said is nonsense. My personal plan
>>>>> was to
>>>>>>> keep
>>>>>>>>>> an eye out
>>>>>>>>>>>>>>>>>>>>>>> during the period where the old API
>>>>> was
>>>>>>> still
>>>>>>>>>> present, but
>>>>>>>>>>>>>>>>>>>>>>> deprecated, to see if people were
>>>>>>> struggling to
>>>>>>>>>> use the new
>>>>>>>>>>>>>>>>>>>>>>> API. If so, then we'd have a chance
>>> to
>>>>>>> address
>>>>>>>>>> it before
>>>>>>>>>>>>>>>>>>>>>>> dropping the old API. But it's even
>>>>> better
>>>>>>> if
>>>>>>>>>> you can help
>>>>>>>>>>>>>>>>>>>>>>> think it through now.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> It did also cross my mind to _not_
>>>>> add the
>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, but just to
>>>>> continue to
>>>>>>> punt
>>>>>>>>>> on the
>>>>>>>>>>>>>>>>>>>>>>> question by just dropping in the new
>>>>>>>>>> ProcessorContext to the
>>>>>>>>>>>>>>>>>>>>>>> new init method. If
>>> StateStoreContext
>>>>>>> seems too
>>>>>>>>>> bold, we can
>>>>>>>>>>>>>>>>>>>>>>> go that direction. But if we
>>> actually
>>>>> add
>>>>>>> some
>>>>>>>>>> methods to
>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, I'd like to be
>>>>> able to
>>>>>>> ensure
>>>>>>>>>> they would
>>>>>>>>>>>>>>>>>>>>>>> be well defined. I think the current
>>>>>>> situation
>>>>>>>>>> was more of
>>>>>>>>>>>>>>>>>>>>>>> an oversight than a choice.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks again for your reply,
>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Wed, 2020-09-09 at 21:23 -0500,
>>>>> Paul
>>>>>>> Whalen
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> John,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> It's exciting to see this KIP head
>>>>> in
>>>>>>> this
>>>>>>>>>> direction!  In
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> last
>>>>>>>>>>>>>>>>>>>>> year
>>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>> so I've tried to sketch out some
>>>>>>> usability
>>>>>>>>>> improvements for
>>>>>>>>>>>>>>>>>>> custom
>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>>>> stores, and I also ended up
>>>>> splitting
>>>>>>> out the
>>>>>>>>>>>>>>>>> StateStoreContext
>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext in an attempt to
>>>>>>> facilitate
>>>>>>>>>> what I was
>>>>>>>>>>>>>>>>> doing.  I
>>>>>>>>>>>>>>>>>>>>> sort of
>>>>>>>>>>>>>>>>>>>>>>>> abandoned it when I realized how
>>>>> large
>>>>>>> the
>>>>>>>>>> ideal change
>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>> to be,
>>>>>>>>>>>>>>>>>>>>>>>> but it's great to see that there
>>> is
>>>>> other
>>>>>>>>>> interest in
>>>>>>>>>>>>>>>> moving
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>> direction (from the folks that
>>>>> matter :)
>>>>>>> ).
>>>>>>>>>>>>>>>>>>>>>>>> Having taken a stab at it myself,
>>> I
>>>>> have
>>>>>>> a
>>>>>>>>>> comment/question
>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> bullet
>>>>>>>>>>>>>>>>>>>>>>>> about StateStoreContext:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> It does *not*  include anything
>>>>>>> processor- or
>>>>>>>>>> record-
>>>>>>>>>>>>>>>>> specific,
>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>> `forward()` or any information
>>>>> about
>>>>>>> the
>>>>>>>>>> "current"
>>>>>>>>>>>>>>>> record,
>>>>>>>>>>>>>>>>>>> which is
>>>>>>>>>>>>>>>>>>>>>>> only a
>>>>>>>>>>>>>>>>>>>>>>>>> well-defined in the context of
>>> the
>>>>>>>>>> Processor. Processors
>>>>>>>>>>>>>>>>>>> process
>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>>>>> at a time, but state stores may
>>> be
>>>>>>> used to
>>>>>>>>>> store and
>>>>>>>>>>>>>>>> fetch
>>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>>>>> records, so
>>>>>>>>>>>>>>>>>>>>>>>>> there is no "current record".
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I totally agree that
>>>>> record-specific or
>>>>>>>>>> processor-specific
>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>>>>>>> state store is often not
>>>>> well-defined
>>>>>>> and it
>>>>>>>>>> would be good
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>> out, but sometimes it (at least
>>>>>>>>>> record-specific context) is
>>>>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>>>>> useful, for example, passing the
>>>>> record's
>>>>>>>>>> timestamp through
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> underlying storage (or changelog
>>>>> topic):
>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
>>>>>>>>>>>>>>>>>>>>>>>> You could have the writer client
>>> of
>>>>> the
>>>>>>> state
>>>>>>>>>> store pass
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> through,
>>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>> it would be nice to be able to
>>> write
>>>>>>> state
>>>>>>>>>> stores where the
>>>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>>>>> did
>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>> have this responsibility.  I'm not
>>>>> sure
>>>>>>> if the
>>>>>>>>>> solution is
>>>>>>>>>>>>>>>>> to add
>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>> things back to StateStoreContext,
>>> or
>>>>>>> make yet
>>>>>>>>>> another
>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>> represents record-specific context
>>>>> while
>>>>>>>>>> inside a state
>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 5:43 PM
>>> John
>>>>>>> Roesler <
>>>>>>>>>>>>>>>>> j...@vvcephei.org>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I've been slowly pushing KIP-478
>>>>>>> forward
>>>>>>>>>> over the last
>>>>>>>>>>>>>>>>> year,
>>>>>>>>>>>>>>>>>>>>>>>>> and I'm happy to say that we're
>>>>> making
>>>>>>> good
>>>>>>>>>> progress now.
>>>>>>>>>>>>>>>>>>>>>>>>> However, several issues with the
>>>>>>> original
>>>>>>>>>> design have
>>>>>>>>>>>>>>>> come
>>>>>>>>>>>>>>>>>>>>>>>>> to light.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> The major changes:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> We discovered that the original
>>>>> plan
>>>>>>> of just
>>>>>>>>>> adding
>>>>>>>>>>>>>>>> generic
>>>>>>>>>>>>>>>>>>>>>>>>> parameters to ProcessorContext
>>>>> was too
>>>>>>>>>> disruptive, so we
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> now adding a new
>>>>> api.ProcessorContext.
>>>>>>>>>>>>>>>>>>>>>>>>> That choice forces us to add a
>>> new
>>>>>>>>>> StateStore.init method
>>>>>>>>>>>>>>>>>>>>>>>>> for the new context, but
>>>>>>> ProcessorContext
>>>>>>>>>> really isn't
>>>>>>>>>>>>>>>>> ideal
>>>>>>>>>>>>>>>>>>>>>>>>> for state stores to begin with,
>>>>> so I'm
>>>>>>>>>> proposing a new
>>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext for this
>>>>> purpose. In
>>>>>>> a
>>>>>>>>>> nutshell, there
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> quite a few methods in
>>>>>>> ProcessorContext that
>>>>>>>>>> actually
>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>> never be called from inside a
>>>>>>> StateStore.
>>>>>>>>>>>>>>>>>>>>>>>>> Also, since there is a new
>>>>>>> ProcessorContext
>>>>>>>>>> interface, we
>>>>>>>>>>>>>>>>>>>>>>>>> need a new MockProcessorContext
>>>>>>>>>> implementation in the
>>>>>>>>>>>>>>>> test-
>>>>>>>>>>>>>>>>>>>>>>>>> utils module.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> The changeset for the KIP
>>>>> document is
>>>>>>> here:
>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
>>>>>>>>>>>>>>>>>>>>>>>>> And the KIP itself is here:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
>>>>>>>>>>>>>>>>>>>>>>>>> If you have any concerns, please
>>>>> let
>>>>>>> me know!
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
> 

Reply via email to