Thanks John! I feel a bit ashamed of just thinking loud here without trying
out prototypes myself :P

I think the FixedKeyProcessor/Record looks very good -- like you said,
since we are making a new set of APIs then why don't we reconsider more
bolderly -- but at the same time I'd also like to make sure we agree on how
much "safety" we can achieve in runtime: even with the proposed APIs, we
cannot prevent users doing something like:

---------------
process(FixedKeyRecord inputRecord) {
    inputRecord.key().modifyField(...); // this is not preventable with
runtime key validations either since we just check the key object itself is
not replaced
    context.forward(inputRecord);
}

---------------

I.e. in either type-safety or runtime validation, we cannot be 100% safe
that users would not do anything wrong. This drives me to think, how much
we'd like to pay to "remind" (instead of say "enforce", since we cannot
really do it) users the semantics of "processValue". Personally I felt that
adding the new set of APIs for that purpose only is a bit overkill, and
hence was leaning towards just the runtime validation. But I admit this is
purely subjective so I'm willing to yield to the group if others feel it's
worthy to do so.


Guozhang



On Mon, Mar 7, 2022 at 10:32 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks, John!
> This looks very promising.
>
> I will familiarize this approach and update the KIP accordingly. From what
> I can see so far, this should cover most of the open issues in this
> proposal.
>
> PS.
>
> > Just as a reminder, the current approach with transformers
> > is NOT enforced at compile time. Transformers have access to
> > a "forwarding disabled" processor context, which still has
> > the forward methods that throw a runtime exception when
> > invoked.
>
> Agree. I was referring to the value transformers where `readOnlyKey` is
> passed but not forwarded internally. Though about the "forwarding disabled"
> approach, you're totally right that is a runtime validation.
> Regardless, the approach proposed here will be a much better one.
>
>
> On Sun, 6 Mar 2022 at 18:59, John Roesler <vvcep...@apache.org> wrote:
>
> > Hello all,
> >
> > It seems like we're making good progress on this discussion.
> > If I'm keeping track correctly, if we can resolve this
> > question about how to handle processValues(), then we should
> > be able to finalize the vote, right?
> >
> > I share Matthias's preference for having a type-safe API.
> >
> > Just as a reminder, the current approach with transformers
> > is NOT enforced at compile time. Transformers have access to
> > a "forwarding disabled" processor context, which still has
> > the forward methods that throw a runtime exception when
> > invoked.
> >
> > However, the spirit of the "new processor api" line of work
> > is to clean up a lot of the cruft around the original
> > processor API, so this is a good opportunity to introduce a
> > type-safe version if we can.
> >
> > Based on my experience adding the new processor API, I felt
> > like it should be possible to do what he suggests, but it
> > would be more involved than what he said. The biggest thing
> > I learned from that effort, though, is that you really have
> > to just try it to see what all the complications are.
> >
> > With that in mind, I went ahead and implemented the
> > suggestion: https://github.com/apache/kafka/pull/11854
> >
> > This is a functional prototype. It only adds processValues,
> > which takes a supplier of a new type, FixedKeyProcessor.
> > That processor only processes FixedKeyRecords, which have a
> > key that cannot be changed. FixedKeyProcessors have a
> > special context, a FixedKeyProcessorContext, which can only
> > forward FixedKeyRecords.
> >
> > FixedKeyRecords have "fixed keys" because its key can only
> > be set in the constructor, and its constructor is package-
> > private.
> >
> > As you can see, this new record/processor/context ecosystem
> > is an independent peer of the general one. This is necessary
> > to ensure the desired compiler check. For example, if
> > FixedKeyRecord were merely an interface implemented by
> > Record, then users could create a new Record with a new key
> > and forward it as a FixedKeyRecord, violating the
> > constraint.
> >
> > As I said, with this proposal, the devil is in the details,
> > so if anyone thinks the API can be simplified, I suggest you
> > check out the branch and try out your proposal. I'd be very
> > happy to have a simplier solution, but I'm also pretty sure
> > this complexity is necessary.
> >
> > Taking a step back, I do think this approach results in a
> > better API, even though the change is a little complicated.
> >
> > Thanks,
> > -John
> >
> > On Sun, 2022-03-06 at 10:51 +0000, Jorge Esteban Quilcate
> > Otoya wrote:
> > > Matthias, thanks for your feedback.
> > >
> > > I can see the following alternatives to deal with `processValues()`:
> > >
> > > 1. Runtime key validation (current proposal)
> > > 2. Using Void type. Guozhang already points out some important
> > > considerations about allocating `Record` twice.
> > > 3. Adding a new ValueRecord, proposed by Matthias. This one would carry
> > > some of the problems of the second alternative as ValueRecord will have
> > to
> > > be created from a Record. Also, either by having a public constructor
> or
> > > creation from a Record, the key _can_ be changed without being captured
> > by
> > > the Topology.
> > > 4. Reducing the KIP scope to `process` only, and removing/postponing
> > > `processValues` for a later DSL redesign.
> > >
> > > A couple of additional comments:
> > >
> > > About the Record API:
> > >
> > > IIUC, the issue with allocating new objects is coming from the current
> > > design of the Record API.
> > > If a user does record.withKey(...).withValue(...) is already leading
> to a
> > > couple of instatiations.
> > > My impression is that if the cost/value of immutability has been
> weighed
> > > already, then maybe the considerations for alternative 2 can be
> > disregarded?
> > > Either way, if the cost of recreation of objects is something we want
> to
> > > minimize, then maybe adding a Builder to the record should help to
> reduce
> > > the allocations.
> > >
> > > About the key validation:
> > >
> > > So far, the only way I can see to _really_ validate a key doesn't
> change
> > at
> > > compile-time is by not exposing it at all — as we are doing it today
> with
> > > Transform.
> > > Otherwise, deal with it at runtime — as we have been dealing with
> > Transform
> > > without the ability to forward.
> > > Processor API already —by definition— means lower-level abstraction,
> > > therefore users should be aware of the potential runtime exceptions if
> > the
> > > key changes.
> > > This is why I'm leaning towards alternative 1.
> > >
> > > Looking forward to your feedback.
> > > As a reminder, the vote thread is still open. Feel free to add your
> vote
> > or
> > > amend if needed.
> > >
> > > Cheers,
> > >
> > >
> > > On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax <mj...@apache.org> wrote:
> > >
> > > > John, thanks for verifying source compatibility. My impression was
> that
> > > > it should be source compatible, I was just not 100% sure.
> > > >
> > > > The question about `processValues()` is really a hard one. Guozhang's
> > > > point is very good one. Maybe we need to be pragmatic and accept the
> > > > runtime check (even if I deeply hate this solution compare to a
> compile
> > > > time check).
> > > >
> > > > Other possibilities to address this issue might just become too ugly?
> > It
> > > > seems it would require to add a new `ValueProcessorContext` that
> offers
> > > > a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record`
> > > > with immutable key? Not sure if we would be willing to go down this
> > > > route? Personally, I would be ok with it, as a strongly prefer
> compile
> > > > time checks and I am happy to extend the API surface area to achieve
> it
> > > > -- however, I won't be surprised if others don't like this idea...
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > Thanks, Guozhang.
> > > > >
> > > > > > Compared with reference checks and runtime exceptions for those
> who
> > > > > > mistakenly change the key, I think that enforcing everyone to
> > `setValue`
> > > > > > may incur more costs..
> > > > >
> > > > > This is a fair point. I agree that this may incur in more costs
> than
> > key
> > > > > checking.
> > > > >
> > > > > Will hold for more feedback, but if we agree I will update the KIP
> > during
> > > > > the week.
> > > > >
> > > > > Cheers,
> > > > > Jorge.
> > > > >
> > > > >
> > > > > On Sun, 27 Feb 2022 at 00:50, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hello folks,
> > > > > >
> > > > > > Regarding the outstanding question, I'm actually a bit leaning
> > towards
> > > > the
> > > > > > second option since that `withKey()` itself always creates a new
> > Record
> > > > > > object. This has a few implications:
> > > > > >
> > > > > > * That we would have to discard the previous Record object to be
> > GC'ed
> > > > with
> > > > > > the new object --- note in practice, processing value does not
> mean
> > > > you'd
> > > > > > have to replace the whole value with `withValue`, but maybe you
> > just
> > > > need
> > > > > > to manipulate some fields of the value object if it is a JSon /
> > etc.
> > > > > > * It may become an obstacle for further runtime optimizations
> e.g.
> > skip
> > > > > > serdes and interpret processing as direct byte manipulations.
> > > > > >
> > > > > > Compared with reference checks and runtime exceptions for those
> who
> > > > > > mistakenly change the key, I think that enforcing everyone to
> > `setValue`
> > > > > > may incur more costs..
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Fri, Feb 25, 2022 at 12:54 PM Jorge Esteban Quilcate Otoya <
> > > > > > quilcate.jo...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Appreciate very much all the great feedback received so far.
> > > > > > >
> > > > > > > > After applying that interface change, I don't see any syntax
> > > > > > > errors in our tests (which use those methods), and the
> > > > > > > StreamBuilderTest still passes for me.
> > > > > > >
> > > > > > > This is awesome John, thank you for your efforts here.
> > > > > > >
> > > > > > > > Jorge, do you mind clarifying these points in the
> Compatibility
> > > > section
> > > > > > > of your KIP?
> > > > > > >
> > > > > > > +1. I have clarified the impact of changing the return type in
> > the KIP.
> > > > > > >
> > > > > > > > I think the other outstanding question for you is whether
> > > > > > > > the output key type for processValues should be K or Void.
> > > > > > > >
> > > > > > > > One thing I realized belatedly was that if we do set it to
> > > > > > > > Void, then users will actually have to override the key when
> > > > > > > > forwarding, like `record.withKey(null)`, whereas if we keep
> > > > > > > > it is K, all users have to do is not touch the key at all.
> > > > > > >
> > > > > > > This is a tricky one.
> > > > > > > On one hand, with Void type for key output, we force the users
> > to cast
> > > > to
> > > > > > > Void and change the key to null,
> > > > > > > though this can be documented on the API, so the users are
> aware
> > of the
> > > > > > > peculiarity of forwarding within `processValues`.
> > > > > > > On the other hand, keeping the key type as output doesn't
> > _require_ to
> > > > do
> > > > > > > any change of keys,
> > > > > > > but this could lead to key-checking runtime exceptions.
> > > > > > >
> > > > > > > I slightly inclined myself for the first option and change the
> > type to
> > > > > > > `Void`.
> > > > > > > This will impose a bit of pain on the users to gain some
> > type-safety
> > > > and
> > > > > > > avoid runtime exceptions.
> > > > > > > We can justify this requirement as a way to prove that the key
> > hasn't
> > > > > > > changed.
> > > > > > >
> > > > > > > Btw, thanks for this idea Matthias!
> > > > > > >
> > > > > > >
> > > > > > > On Fri, 25 Feb 2022 at 17:10, John Roesler <
> vvcep...@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > > > Oh, one more thing Jorge,
> > > > > > > >
> > > > > > > > I think the other outstanding question for you is whether
> > > > > > > > the output key type for processValues should be K or Void. I
> > > > > > > > get the impression that all of us don't feel too strongly
> > > > > > > > about it, so I think the ball is in your court to consider
> > > > > > > > everyone's points and make a call (with justification).
> > > > > > > >
> > > > > > > > One thing I realized belatedly was that if we do set it to
> > > > > > > > Void, then users will actually have to override the key when
> > > > > > > > forwarding, like `record.withKey(null)`, whereas if we keep
> > > > > > > > it as K, all users have to do is not touch the key at all.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Fri, 2022-02-25 at 11:07 -0600, John Roesler wrote:
> > > > > > > > > Hello all,
> > > > > > > > >
> > > > > > > > > I'll chime in again in the interest of trying to do a
> better
> > > > > > > > > job of keeping KIPs moving forward...
> > > > > > > > >
> > > > > > > > > Matthias raised some very good questions about whether the
> > > > > > > > > change is really source compatible. I just checked out the
> > > > > > > > > code and make the interface change that Jorge specified in
> > > > > > > > > the KIP:
> > > > > > > > >
> > > > > > > > > > Modified methods:
> > > > > > > > > >
> > > > > > > > > > KStream<KOut,VOut> KStream#process(ProcessorSupplier<K,
> V,
> > > > > > > > > KOut, VOut> processorSupplier, String... stateStoreNames)
> > > > > > > > > > KStream<KOut,VOut> KStream#process(ProcessorSupplier<K,
> V,
> > > > > > > > > KOut, VOut> processorSupplier, Named named, String...
> > > > > > > > > stateStoreNames)
> > > > > > > > >
> > > > > > > > > After applying that interface change, I don't see any
> syntax
> > > > > > > > > errors in our tests (which use those methods), and the
> > > > > > > > > StreamBuilderTest still passes for me.
> > > > > > > > >
> > > > > > > > > The reason is that the existing API already takes a
> > > > > > > > > ProcessorSupplier<K, V, Void, Void> and is currently a
> > > > > > > > > `void` return.
> > > > > > > > >
> > > > > > > > > After this interface change, all existing usages will just
> > > > > > > > > bind Void to KOut and Void to VOut. In other words, KOut,
> > > > > > > > > which is short for `KOut extends Object` is an upper bound
> > > > > > > > > on Void, so all existing processor suppliers are still
> valid
> > > > > > > > > arguments.
> > > > > > > > >
> > > > > > > > > Because the current methods are void returns, no existing
> > > > > > > > > code could be assigning the result to any variable, so
> > > > > > > > > moving from a void return to a typed return is always
> > > > > > > > > compatible.
> > > > > > > > >
> > > > > > > > > Jorge, do you mind clarifying these points in the
> > > > > > > > > Compatibility section of your KIP?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > -John
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, 2022-02-23 at 15:07 -0800, Matthias J. Sax wrote:
> > > > > > > > > > For this KIP, I also see the value. I was just trying to
> > make a
> > > > > > step
> > > > > > > > > > back and ask if it's a good short term solution. If we
> > believe it
> > > > > > is,
> > > > > > > > I
> > > > > > > > > > am fine with it.
> > > > > > > > > >
> > > > > > > > > > (I am more worried about the header's KIP...)
> > > > > > > > > >
> > > > > > > > > > Btw: I am still wondering if we can change existing
> > `process()` as
> > > > > > > > > > proposed in the KIP? It the propose change source
> > compatible? (It's
> > > > > > > > for
> > > > > > > > > > sure not binary compatible, but this seems fine -- I
> don't
> > think we
> > > > > > > > > > guarantee binary compatibility).
> > > > > > > > > >
> > > > > > > > > > Btw: would be good to clarify what is changes for
> > process() --
> > > > > > should
> > > > > > > > be
> > > > > > > > > > return type change from `void` to `KStream<KOut, VOut>`
> as
> > well as
> > > > > > > > > > change of `ProcessorSupplier` generic types (output types
> > change
> > > > > > from
> > > > > > > > > > `Void` to `KOut` and `VOut`?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > -Matthias
> > > > > > > > > >
> > > > > > > > > > On 2/23/22 11:32 AM, Guozhang Wang wrote:
> > > > > > > > > > > Hi folks,
> > > > > > > > > > >
> > > > > > > > > > > I agree with John that this KIP by itself could be a
> good
> > > > > > > > improvement, and
> > > > > > > > > > > I feel it aligns well with the eventual DSL 2.0
> proposal
> > so we do
> > > > > > > > not need
> > > > > > > > > > > to hold it until later.
> > > > > > > > > > >
> > > > > > > > > > > Regarding the last point (i.e. whether we should do
> > enforcement
> > > > > > > with
> > > > > > > > a new
> > > > > > > > > > > interface), here's my 2c: in the past we introduced
> > public
> > > > > > > > > > > `ValueTransfomer/etc` for two purposes, 1) to enforce
> > the key is
> > > > > > > not
> > > > > > > > > > > modifiable, 2) to indicate inside the library's
> topology
> > builder
> > > > > > > > itself
> > > > > > > > > > > that since the key is not modified, the direct
> > downstream does
> > > > > > not
> > > > > > > > need to
> > > > > > > > > > > inject a repartition stage. I think we are more or less
> > on the
> > > > > > same
> > > > > > > > page
> > > > > > > > > > > that for purpose 1), doing runtime check could be
> > sufficient; as
> > > > > > > for
> > > > > > > > the
> > > > > > > > > > > purpose of 2), as for this KIP itself I think it is
> > similar to
> > > > > > what
> > > > > > > > we have
> > > > > > > > > > > (i.e. just base on the function name "processValue"
> > itself) and
> > > > > > > > hence are
> > > > > > > > > > > not sacrificed either. I do not know if
> > > > > > > > > > > `KStream#processValue(ProcessorSupplier<K, V, Void,
> VOut>
> > > > > > > > > > > processorSupplier)` will work, or work better, maybe
> > Jorge could
> > > > > > do
> > > > > > > > some
> > > > > > > > > > > digging and get back to us.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Feb 18, 2022 at 8:24 AM John Roesler <
> > > > > > vvcep...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hello all,
> > > > > > > > > > > >
> > > > > > > > > > > > While I sympathize with Matthias’s desire to wipe the
> > slate
> > > > > > clean
> > > > > > > > and
> > > > > > > > > > > > redesign the dsl with full knowledge of everything
> > we’ve
> > > > > > learned
> > > > > > > > in the
> > > > > > > > > > > > past few years, that would also be a pretty intense
> > project on
> > > > > > > its
> > > > > > > > own. It
> > > > > > > > > > > > seems better to leave that project for someone who is
> > motivated
> > > > > > > to
> > > > > > > > take it
> > > > > > > > > > > > on.
> > > > > > > > > > > >
> > > > > > > > > > > > Reading between the lines, it seems like Jorge’s
> > motivation is
> > > > > > > > more along
> > > > > > > > > > > > the lines of removing a few specific pain points. I
> > appreciate
> > > > > > > > Matthias
> > > > > > > > > > > > extending the offer, but if Jorge doesn’t want to
> > redesign the
> > > > > > > dsl
> > > > > > > > right
> > > > > > > > > > > > now, we’re better off just accepting the work he’s
> > willing to
> > > > > > do.
> > > > > > > > > > > >
> > > > > > > > > > > > Specifically, this KIP is quite a nice improvement.
> > Looking at
> > > > > > > the
> > > > > > > > KStream
> > > > > > > > > > > > interface, roughly half of it is devoted to various
> > flavors of
> > > > > > > > “transform”,
> > > > > > > > > > > > which makes it really hard on users to figure out
> > which they
> > > > > > are
> > > > > > > > supposed
> > > > > > > > > > > > to use for what purpose. This kip let us drop all
> that
> > > > > > complexity
> > > > > > > > in favor
> > > > > > > > > > > > of just two methods, thanks to the fact that we now
> > have the
> > > > > > > > ability for
> > > > > > > > > > > > processors to specify their forwarding type.
> > > > > > > > > > > >
> > > > > > > > > > > > By the way, I really like Matthias’s suggestion to
> set
> > the KOut
> > > > > > > > generic
> > > > > > > > > > > > bound to Void for processValues. Then, instead of
> > doing an
> > > > > > > > equality check
> > > > > > > > > > > > on the key during forward, you’d just set the key
> back
> > to the
> > > > > > one
> > > > > > > > saved
> > > > > > > > > > > > before processing (with setRecordKey). This is both
> > more
> > > > > > > efficient
> > > > > > > > (because
> > > > > > > > > > > > we don’t have the equality check) and more foolproof
> > for users
> > > > > > > > (because
> > > > > > > > > > > > it’s enforced by the compiler instead of the
> runtime).
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks, all!
> > > > > > > > > > > > -John
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Feb 18, 2022, at 00:43, Jorge Esteban
> Quilcate
> > Otoya
> > > > > > > wrote:
> > > > > > > > > > > > > On Fri, 18 Feb 2022 at 02:16, Matthias J. Sax <
> > > > > > > mj...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > > It probably deserves its own thread to start
> > discussing
> > > > > > > > ideas.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Yes. My question was: if we think it's time to do
> > a DSL
> > > > > > 2.0,
> > > > > > > > should we
> > > > > > > > > > > > > > drop this KIP and just fix via DSL 2.0 instead?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > Good question. Would love to hear what others think
> > about
> > > > > > this.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I've stated my position about this here:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > For this KIP specifically, I think about it as a
> > > > > > continuation
> > > > > > > > from
> > > > > > > > > > > > > KIP-478. Therefore, it could make sense to have it
> > as part of
> > > > > > > > the current
> > > > > > > > > > > > > version of the DSL.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I'd even add that if this KIP is adopted, I would
> > not be that
> > > > > > > > > > > > disappointed
> > > > > > > > > > > > > if KIP-634 is dropped in favor of a DSL v2.0 as the
> > access to
> > > > > > > > headers
> > > > > > > > > > > > > provided by KIP-478's via Record API is much better
> > than
> > > > > > > previous
> > > > > > > > > > > > > `.context().headers()`.
> > > > > > > > > > > > >
> > > > > > > > > > > > > But happy to reconsider if there is an agreement to
> > focus
> > > > > > > efforts
> > > > > > > > > > > > towards a
> > > > > > > > > > > > > DSL 2.0.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > > > You're right. I'm not proposing the method
> > signature.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What signature do you propose? I don't see an
> > update on the
> > > > > > > > KIP.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > My bad. I have clarified this in the KIP's public
> > > > > > interfaces
> > > > > > > > now:
> > > > > > > > > > > > >
> > > > > > > > > > > > > ```
> > > > > > > > > > > > >
> > > > > > > > > > > > > New methods:
> > > > > > > > > > > > >
> > > > > > > > > > > > >      - KStream<K,VOut>
> > > > > > > KStream#processValues(ProcessorSupplier<K,
> > > > > > > > V, K,
> > > > > > > > > > > > VOut>
> > > > > > > > > > > > >      processorSupplier, String... stateStoreNames)
> > > > > > > > > > > > >      - KStream<K,VOut>
> > > > > > > KStream#processValues(ProcessorSupplier<K,
> > > > > > > > V, K,
> > > > > > > > > > > > VOut>
> > > > > > > > > > > > >      processorSupplier, Named named, String...
> > > > > > stateStoreNames)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Modified methods:
> > > > > > > > > > > > >
> > > > > > > > > > > > >      - KStream<KOut,VOut>
> > KStream#process(ProcessorSupplier<K,
> > > > > > > V,
> > > > > > > > KOut,
> > > > > > > > > > > > VOut>
> > > > > > > > > > > > >      processorSupplier, String... stateStoreNames)
> > > > > > > > > > > > >      - KStream<KOut,VOut>
> > KStream#process(ProcessorSupplier<K,
> > > > > > > V,
> > > > > > > > KOut,
> > > > > > > > > > > > VOut>
> > > > > > > > > > > > >      processorSupplier, Named named, String...
> > > > > > stateStoreNames)
> > > > > > > > > > > > >
> > > > > > > > > > > > > ```
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Not sure if I understand how this would look
> > like. Do you
> > > > > > > > mean
> > > > > > > > > > > > checking
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > on the Record itself or somewhere else?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > @Guozhang: I am not worried about the runtime
> > overhead. I
> > > > > > am
> > > > > > > > worries
> > > > > > > > > > > > > > about user experience. It's not clear from the
> > method
> > > > > > > > signature, that
> > > > > > > > > > > > > > you are not allowed to change the key, what seems
> > to be bad
> > > > > > > > API desig.
> > > > > > > > > > > > > > Even if I understand the desire to keep the API
> > surface
> > > > > > ares
> > > > > > > > small -- I
> > > > > > > > > > > > > > would rather have a compile time enforcement than
> > a runtime
> > > > > > > > check.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > For example, we have `map()` and `mapValues()`
> and
> > > > > > > > `mapValues()` returns
> > > > > > > > > > > > > > a `Value V` (enforces that that key is not
> > changes) instead
> > > > > > > of
> > > > > > > > a
> > > > > > > > > > > > > > `KeyValue<KIn,VOut>` and we use a runtime check
> to
> > check
> > > > > > that
> > > > > > > > the key is
> > > > > > > > > > > > > > not changed.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Naively, could we enforce something similar by
> > setting the
> > > > > > > > output key
> > > > > > > > > > > > > > type as `Void`.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >      KStream#processValue(ProcessorSupplier<K, V,
> > Void,
> > > > > > VOut>
> > > > > > > > > > > > > > processorSupplier)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Not sure if this would work or not?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Or it might be worth to add a new interface,
> > > > > > > > `ValueProcessorSupplier`
> > > > > > > > > > > > > > that ensures that the key is not modified?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > This is an important discussion, even more so with
> a
> > DSL
> > > > > > v2.0.
> > > > > > > > > > > > >
> > > > > > > > > > > > > At the moment, the DSL just flags whether
> > partitioning is
> > > > > > > > required based
> > > > > > > > > > > > on
> > > > > > > > > > > > > the DSL operation. As mentioned, `mapValues()`
> > enforces only
> > > > > > > the
> > > > > > > > value
> > > > > > > > > > > > has
> > > > > > > > > > > > > changed through the DSL, though the only
> _guarantee_
> > we have
> > > > > > is
> > > > > > > > that
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > Streams "owns" the implementation, and we can flag
> > this
> > > > > > > properly.
> > > > > > > > > > > > >
> > > > > > > > > > > > > With a hypothetical v2.0 based on Record API, this
> > will be
> > > > > > > > harder to
> > > > > > > > > > > > > enforce with the current APIs. e.g. with
> > `mapValues(Record<K,
> > > > > > > V>
> > > > > > > > > > > > record)`,
> > > > > > > > > > > > > nothing would stop users from using
> > > > > > > > > > > > `record.withKey("needs_partitioning")`.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The approach defined on this KIP is similar to what
> > we have
> > > > > > at
> > > > > > > > the moment
> > > > > > > > > > > > > on `ValueTransformer*` where it validates at
> runtime
> > that the
> > > > > > > > users are
> > > > > > > > > > > > not
> > > > > > > > > > > > > calling `forward` with
> > `ForwardingDisabledProcessorContext`.
> > > > > > > > > > > > > `ValueProcessorSupplier` is not meant to be a
> public
> > API.
> > > > > > Only
> > > > > > > > to be used
> > > > > > > > > > > > > internally on `processValues` implementation.
> > > > > > > > > > > > >
> > > > > > > > > > > > > At first,
> `KStream#processValue(ProcessorSupplier<K,
> > V, Void,
> > > > > > > > VOut>
> > > > > > > > > > > > > processorSupplier)` won't work as it will require
> the
> > > > > > > `Processor`
> > > > > > > > > > > > > implementation to actually change the key. Will
> take
> > a deeper
> > > > > > > > look to
> > > > > > > > > > > > > validate if this could solve this issue.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On 2/17/22 10:56 AM, Guozhang Wang wrote:
> > > > > > > > > > > > > > > Regarding the last question Matthias had, I
> > wonder if
> > > > > > it's
> > > > > > > > similar to
> > > > > > > > > > > > my
> > > > > > > > > > > > > > > first email's point 2) above? I think the
> > rationale is
> > > > > > > that,
> > > > > > > > since
> > > > > > > > > > > > > > > reference checks are relatively very cheap, it
> is
> > > > > > > worthwhile
> > > > > > > > to pay
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > extra runtime checks and in return to have a
> > single
> > > > > > > > consolidated
> > > > > > > > > > > > > > > ProcessorSupplier programming interface (i.e.
> we
> > would
> > > > > > > > eventually
> > > > > > > > > > > > > > > deprecate ValueTransformerWithKeySupplier).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 10:57 AM Jorge Esteban
> > Quilcate
> > > > > > > > Otoya <
> > > > > > > > > > > > > > > quilcate.jo...@gmail.com> wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thank you Matthias, this is great feedback.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Adding my comments below.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Wed, 16 Feb 2022 at 00:42, Matthias J.
> Sax <
> > > > > > > > mj...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > In alignment to my reply to KIP-634, I am
> > wondering
> > > > > > if
> > > > > > > > we are
> > > > > > > > > > > > heading
> > > > > > > > > > > > > > > > > into the right direction, or if we should
> > consider to
> > > > > > > > re-design the
> > > > > > > > > > > > DSL
> > > > > > > > > > > > > > > > > from scratch?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I'm very excited about the idea of a DLS
> v2.0.
> > It
> > > > > > > probably
> > > > > > > > deserves
> > > > > > > > > > > > its
> > > > > > > > > > > > > > own
> > > > > > > > > > > > > > > > thread to start discussing ideas.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > For this KIP specifically, I think about it
> as
> > a
> > > > > > > > continuation from
> > > > > > > > > > > > > > KIP-478.
> > > > > > > > > > > > > > > > Therefore, it could make sense to have it as
> > part of
> > > > > > the
> > > > > > > > current
> > > > > > > > > > > > > > version of
> > > > > > > > > > > > > > > > the DSL.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Even if we don't do a DSL 2.0 right now, I
> > have some
> > > > > > > > concerns about
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > KIP:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > (1) I am not sure if the propose changed is
> > backward
> > > > > > > > compatible? We
> > > > > > > > > > > > > > > > > currently have:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >       void
> KStream#process(ProcessorSupplier,
> > > > > > String...)
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The newly proposed method:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >       KStream
> > KStream#process(ProcessorSupplier)
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > seems to be an incompatible change?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The KIP states:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Modified method KStream#process should be
> > > > > > compatible
> > > > > > > > with previous
> > > > > > > > > > > > > > > > > version, that at the moment is fixed to a
> > Void return
> > > > > > > > type.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Why is it backward compatible? Having both
> > old and
> > > > > > new
> > > > > > > > #process()
> > > > > > > > > > > > seems
> > > > > > > > > > > > > > > > > not to be compatible to me? Or are you
> > proposing to
> > > > > > > > _change_ the
> > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > signature (if yes, the `String...`
> parameter
> > to add a
> > > > > > > > state store
> > > > > > > > > > > > seems
> > > > > > > > > > > > > > > > > to be missing)? For this case, it seems
> that
> > existing
> > > > > > > > programs
> > > > > > > > > > > > would at
> > > > > > > > > > > > > > > > > least need to be recompiled -- it would
> only
> > be a
> > > > > > > source
> > > > > > > > compatible
> > > > > > > > > > > > > > > > > change, but not a binary compatible change?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > You're right. I'm not proposing the method
> > signature.
> > > > > > > > > > > > > > > > Totally agree about compatibility issue. I
> was
> > only
> > > > > > > > considering
> > > > > > > > > > > > source
> > > > > > > > > > > > > > > > compatibility and was ignorant that changing
> > from void
> > > > > > to
> > > > > > > > a specific
> > > > > > > > > > > > > > type
> > > > > > > > > > > > > > > > would break binary compatibility.
> > > > > > > > > > > > > > > > I will update the KIP to reflect this:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Modifications to method KStream#process are
> > source
> > > > > > > > compatible with
> > > > > > > > > > > > > > > > previous version, though not binary
> compatible.
> > > > > > Therefore
> > > > > > > > will
> > > > > > > > > > > > require
> > > > > > > > > > > > > > > > users to recompile their applications with
> the
> > latest
> > > > > > > > version.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I am also wondering if/how this change
> > related to
> > > > > > > > KIP-401:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >     From a high level it might not
> conflict,
> > but I
> > > > > > > wanted
> > > > > > > > to double
> > > > > > > > > > > > > > check?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Wasn't aware of this KIP, thanks for sharing!
> > I don't
> > > > > > > > think there is
> > > > > > > > > > > > > > > > conflict between KIPs, as far as I
> understand.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > For `KStream#processValues()`, my main
> > concern is the
> > > > > > > > added runtime
> > > > > > > > > > > > > > > > > check if the key was modified or not -- it
> > seems to
> > > > > > > > provide bad user
> > > > > > > > > > > > > > > > > experience -- enforcing that the key is not
> > modified
> > > > > > on
> > > > > > > > an API
> > > > > > > > > > > > level,
> > > > > > > > > > > > > > > > > would seem to be much better.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Last, what is the purpose of
> > `setRecordKey()` and
> > > > > > > > > > > > `clearRecordKey()`? I
> > > > > > > > > > > > > > > > > am not sure if I understand their purpose?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Both methods set/clear the context (current
> > key) to be
> > > > > > > > used when
> > > > > > > > > > > > > > checking
> > > > > > > > > > > > > > > > keys on forward(record) implementation.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > enforcing that the key is not modified on
> an
> > API
> > > > > > level,
> > > > > > > > would seem
> > > > > > > > > > > > to
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > much better.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Not sure if I understand how this would look
> > like. Do
> > > > > > you
> > > > > > > > mean
> > > > > > > > > > > > checking
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > on the Record itself or somewhere else?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On 2/15/22 11:53 AM, John Roesler wrote:
> > > > > > > > > > > > > > > > > > My apologies, this feedback was intended
> > for
> > > > > > KIP-634.
> > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Tue, Feb 15, 2022, at 13:15, John
> > Roesler wrote:
> > > > > > > > > > > > > > > > > > > Thanks for the update, Jorge,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I've just looked over the KIP again.
> > Just one
> > > > > > more
> > > > > > > > small
> > > > > > > > > > > > > > > > > > > concern:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > 5) We can't just change the type of
> > > > > > > Record#headers()
> > > > > > > > to a
> > > > > > > > > > > > > > > > > > > new fully qualified type. That would be
> > a source-
> > > > > > > > > > > > > > > > > > > incompatible breaking change for users.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Out options are:
> > > > > > > > > > > > > > > > > > > * Deprecate the existing method and
> > create a new
> > > > > > > one
> > > > > > > > with
> > > > > > > > > > > > > > > > > > > the new type
> > > > > > > > > > > > > > > > > > > * If the existing Headers is "not great
> > but ok",
> > > > > > > > then maybe
> > > > > > > > > > > > > > > > > > > we leave it alone.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Mon, 2022-02-14 at 13:58 -0600, Paul
> > Whalen
> > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > No specific comments, but I just
> > wanted to
> > > > > > > mention
> > > > > > > > I like the
> > > > > > > > > > > > > > > > > direction of
> > > > > > > > > > > > > > > > > > > > the KIP.  My team is a big user of
> > "transform"
> > > > > > > > methods because of
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > ability to chain them, and I have
> > always found
> > > > > > > the
> > > > > > > > terminology
> > > > > > > > > > > > > > > > > challenging
> > > > > > > > > > > > > > > > > > > > to explain alongside "process".  It
> > felt like
> > > > > > one
> > > > > > > > concept with
> > > > > > > > > > > > two
> > > > > > > > > > > > > > > > > names.
> > > > > > > > > > > > > > > > > > > > So moving towards a single API that
> is
> > powerful
> > > > > > > > enough to handle
> > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > > > > cases seems absolutely correct to me.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Mon, Feb 14, 2022 at 1:12 PM Jorge
> > Esteban
> > > > > > > > Quilcate Otoya <
> > > > > > > > > > > > > > > > > > > > quilcate.jo...@gmail.com> wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Got it. Thanks John, this make
> sense.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > I've updated the KIP to include the
> > > > > > deprecation
> > > > > > > > of:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >        - KStream#transform
> > > > > > > > > > > > > > > > > > > > >        - KStream#transformValues
> > > > > > > > > > > > > > > > > > > > >        - KStream#flatTransform
> > > > > > > > > > > > > > > > > > > > >        -
> KStream#flatTransformValues
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Fri, 11 Feb 2022 at 15:16, John
> > Roesler <
> > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Thanks, Jorge!
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > I think it’ll be better to keep
> > this KIP
> > > > > > > > focused on KStream
> > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > > only.
> > > > > > > > > > > > > > > > > > > > > > I suspect that the KTable methods
> > may be
> > > > > > more
> > > > > > > > complicated than
> > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > proposed replacement, but it’ll
> > also be
> > > > > > > easier
> > > > > > > > to consider that
> > > > > > > > > > > > > > > > > question
> > > > > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > > > > isolation.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > The nice thing about just
> > deprecating the
> > > > > > > > KStream methods and
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > Transform* interfaces is that you
> > can keep
> > > > > > > > your proposal just
> > > > > > > > > > > > > > > > scoped
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > KStream and not have any
> > consequences for
> > > > > > the
> > > > > > > > rest of the DSL.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Thanks again,
> > > > > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 11, 2022, at 06:43,
> > Jorge
> > > > > > Esteban
> > > > > > > > Quilcate Otoya
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > Thanks, John.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > 4) I agree that we shouldn't
> > deprecate
> > > > > > > the
> > > > > > > > Transformer*
> > > > > > > > > > > > > > > > > > > > > > > classes, but do you think we
> > should
> > > > > > > > deprecate the
> > > > > > > > > > > > > > > > > > > > > > > KStream#transform* methods? I'm
> > curious
> > > > > > if
> > > > > > > > there's any
> > > > > > > > > > > > > > > > > > > > > > > remaining reason to have those
> > methods,
> > > > > > or
> > > > > > > > if your KIP
> > > > > > > > > > > > > > > > > > > > > > > completely obviates them.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Good catch.
> > > > > > > > > > > > > > > > > > > > > > > I considered that deprecating
> > > > > > > `Transformer*`
> > > > > > > > and `transform*`
> > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > go
> > > > > > > > > > > > > > > > > > > > > > hand
> > > > > > > > > > > > > > > > > > > > > > > in hand — maybe it happened
> > similarly
> > > > > > with
> > > > > > > > old `Processor` and
> > > > > > > > > > > > > > > > > > > > > `process`?
> > > > > > > > > > > > > > > > > > > > > > > Though deprecating only
> > `transform*`
> > > > > > > > operations could be a
> > > > > > > > > > > > better
> > > > > > > > > > > > > > > > > > > > > signal
> > > > > > > > > > > > > > > > > > > > > > > for users than non deprecating
> > anything
> > > > > > at
> > > > > > > > all and pave the
> > > > > > > > > > > > way
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > > deprecation.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Should this deprecation also
> > consider
> > > > > > > > including
> > > > > > > > > > > > > > > > > > > > > `KTable#transformValues`?
> > > > > > > > > > > > > > > > > > > > > > > The approach proposed on the
> KIP:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > `ktable.toStream().processValues().toTable()` seems fair to
> > > > > > > > > > > > me,
> > > > > > > > > > > > > > > > > though
> > > > > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > > > > will have to test it further.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > I'm happy to update the KIP if
> > there's
> > > > > > some
> > > > > > > > consensus around
> > > > > > > > > > > > > > this.
> > > > > > > > > > > > > > > > > > > > > > > Will add the deprecation notes
> > these days
> > > > > > > > and wait for any
> > > > > > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > > > > > > > > feedback on this topic before
> > wrapping up
> > > > > > > > the KIP.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > On Fri, 11 Feb 2022 at 04:03,
> > John
> > > > > > Roesler
> > > > > > > <
> > > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Thanks for the update, Jorge!
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > I just read over the KIP
> > again, and I'm
> > > > > > > in
> > > > > > > > support. One more
> > > > > > > > > > > > > > > > > > > > > > > > question came up for me,
> > though:
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > 4) I agree that we shouldn't
> > deprecate
> > > > > > > the
> > > > > > > > Transformer*
> > > > > > > > > > > > > > > > > > > > > > > > classes, but do you think we
> > should
> > > > > > > > deprecate the
> > > > > > > > > > > > > > > > > > > > > > > > KStream#transform* methods?
> > I'm curious
> > > > > > > if
> > > > > > > > there's any
> > > > > > > > > > > > > > > > > > > > > > > > remaining reason to have
> those
> > methods,
> > > > > > > or
> > > > > > > > if your KIP
> > > > > > > > > > > > > > > > > > > > > > > > completely obviates them.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > On Thu, 2022-02-10 at 21:32
> > +0000,
> > > > > > Jorge
> > > > > > > > Esteban Quilcate
> > > > > > > > > > > > > > > > > > > > > > > > Otoya wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > Thank you both for your
> > feedback!
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I have added the following
> > note on
> > > > > > > > punctuation:
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > > > > > > > > > > NOTE: The key validation
> can
> > be
> > > > > > defined
> > > > > > > > when processing the
> > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > > > > Though, with punctuations
> it
> > won't be
> > > > > > > > possible to define the
> > > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > validation before
> forwarding,
> > > > > > therefore
> > > > > > > > it won't be
> > > > > > > > > > > > possible to
> > > > > > > > > > > > > > > > > > > > > > forward
> > > > > > > > > > > > > > > > > > > > > > > > > from punctuation.
> > > > > > > > > > > > > > > > > > > > > > > > > This is similar behavior to
> > how
> > > > > > > > `ValueTransformer`s behave
> > > > > > > > > > > > at
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > moment.
> > > > > > > > > > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Also make it explicit also
> > that we
> > > > > > are
> > > > > > > > going to apply
> > > > > > > > > > > > > > > > referencial
> > > > > > > > > > > > > > > > > > > > > > > > equality
> > > > > > > > > > > > > > > > > > > > > > > > > for key validation.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I hope this is covering all
> > your
> > > > > > > > feedback, let me know if
> > > > > > > > > > > > I'm
> > > > > > > > > > > > > > > > > > > > > missing
> > > > > > > > > > > > > > > > > > > > > > > > > anything.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > > > > > > > > > Jorge.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 9 Feb 2022 at
> 22:19,
> > Guozhang
> > > > > > > > Wang <
> > > > > > > > > > > > wangg...@gmail.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > I'm +1 on John's point 3)
> > for
> > > > > > > > punctuations.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > And I think if people are
> > on the
> > > > > > same
> > > > > > > > page that a reference
> > > > > > > > > > > > > > > > > > > > > equality
> > > > > > > > > > > > > > > > > > > > > > > > check
> > > > > > > > > > > > > > > > > > > > > > > > > > per record is not a huge
> > overhead,
> > > > > > I
> > > > > > > > think doing that
> > > > > > > > > > > > > > > > enforcement
> > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > > > > > > > > > > > than documentations and
> > hand-wavy
> > > > > > > > undefined behaviors.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 9, 2022 at
> > 11:27 AM
> > > > > > John
> > > > > > > > Roesler <
> > > > > > > > > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP
> Jorge,
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > I'm in support of your
> > proposal.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > 1)
> > > > > > > > > > > > > > > > > > > > > > > > > > > I do agree with
> > Guozhang's point
> > > > > > > > (1). I think the cleanest
> > > > > > > > > > > > > > > > > > > > > > > > > > > approach. I think it's
> > cleaner
> > > > > > and
> > > > > > > > better to keep the
> > > > > > > > > > > > > > > > > > > > > > > > > > > enforcement internal to
> > the
> > > > > > > > framework than to introduce a
> > > > > > > > > > > > > > > > > > > > > > > > > > > public API or context
> > wrapper for
> > > > > > > > processors to use
> > > > > > > > > > > > > > > > > > > > > > > > > > > explicitly.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > 2) I tend to agree with
> > you on
> > > > > > this
> > > > > > > > one; I think the
> > > > > > > > > > > > > > > > > > > > > > > > > > > equality check ought to
> > be fast
> > > > > > > > enough in practice.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > 3) I think this is
> > implicit, but
> > > > > > > > should be explicit in the
> > > > > > > > > > > > > > > > > > > > > > > > > > > KIP: For the
> > `processValues` API,
> > > > > > > > because the framework
> > > > > > > > > > > > sets
> > > > > > > > > > > > > > > > > > > > > > > > > > > the key on the context
> > before
> > > > > > > > calling `process` and then
> > > > > > > > > > > > > > > > > > > > > > > > > > > unsets it afterwards,
> > there will
> > > > > > > > always be no key set
> > > > > > > > > > > > during
> > > > > > > > > > > > > > > > > > > > > > > > > > > task puctuation.
> > Therefore, while
> > > > > > > > processors may still
> > > > > > > > > > > > > > > > > > > > > > > > > > > register punctuators,
> > they will
> > > > > > not
> > > > > > > > be able to forward
> > > > > > > > > > > > > > > > > > > > > > > > > > > anything from them.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > This is functionally
> > equivalent
> > > > > > to
> > > > > > > > the existing
> > > > > > > > > > > > > > > > > > > > > > > > > > > transformers, by the
> > way, that
> > > > > > are
> > > > > > > > also forbidden to
> > > > > > > > > > > > forward
> > > > > > > > > > > > > > > > > > > > > > > > > > > anything during
> > punctuation.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > For what it's worth, I
> > think this
> > > > > > > is
> > > > > > > > the best tradeoff.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > The only alternative I
> > see is not
> > > > > > > to
> > > > > > > > place any restriction
> > > > > > > > > > > > > > > > > > > > > > > > > > > on forwarded keys at
> all
> > and just
> > > > > > > > document that if users
> > > > > > > > > > > > > > > > > > > > > > > > > > > don't maintain proper
> > > > > > partitioning,
> > > > > > > > they'll get undefined
> > > > > > > > > > > > > > > > > > > > > > > > > > > behavior. That might be
> > more
> > > > > > > > powerful, but it's also a
> > > > > > > > > > > > > > > > > > > > > > > > > > > usability problem.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2022-02-09 at
> > 11:34
> > > > > > +0000,
> > > > > > > > Jorge Esteban Quilcate
> > > > > > > > > > > > > > > > > > > > > > > > > > > Otoya wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Guozhang.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Does
> > `ValueProcessorContext`
> > > > > > > > have to be a public API? It
> > > > > > > > > > > > > > > > > > > > > seems
> > > > > > > > > > > > > > > > > > > > > > > > to me
> > > > > > > > > > > > > > > > > > > > > > > > > > > > that this can be
> > completely
> > > > > > > > abstracted away from user
> > > > > > > > > > > > > > > > > > > > > interfaces
> > > > > > > > > > > > > > > > > > > > > > > > as an
> > > > > > > > > > > > > > > > > > > > > > > > > > > > internal class
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Totally agree. No
> > intention to
> > > > > > > add
> > > > > > > > these as public APIs.
> > > > > > > > > > > > > > Will
> > > > > > > > > > > > > > > > > > > > > > > > update
> > > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > KIP to reflect this.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > in the past the
> > rationale for
> > > > > > > > enforcing it at the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > interface layer
> rather
> > than do
> > > > > > > > runtime checks is that it
> > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > > > > > > > > > efficient.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure how
> much
> > > > > > overhead
> > > > > > > > it may incur to check if
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > > > > > > change: if it is
> just a
> > > > > > reference
> > > > > > > > equality check maybe
> > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > okay.
> > > > > > > > > > > > > > > > > > > > > > > > > > What's
> > > > > > > > > > > > > > > > > > > > > > > > > > > > your take on this?
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Agree, reference
> > equality
> > > > > > should
> > > > > > > > cover this validation
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > overhead
> > > > > > > > > > > > > > > > > > > > > > > > > > > > impact should not be
> > > > > > meaningful.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Will update the KIP
> to
> > reflect
> > > > > > > > this as well.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 8 Feb 2022 at
> > 19:05,
> > > > > > > > Guozhang Wang <
> > > > > > > > > > > > > > > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Jorge,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for bringing
> > this
> > > > > > KIP! I
> > > > > > > > think this is a nice
> > > > > > > > > > > > idea
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > consider
> > > > > > > > > > > > > > > > > > > > > > > > > > > using
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > a single overloaded
> > function
> > > > > > > > name for #process, just a
> > > > > > > > > > > > > > > > > > > > > couple
> > > > > > > > > > > > > > > > > > > > > > > > quick
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > questions after
> > reading the
> > > > > > > > proposal:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) Does
> > > > > > `ValueProcessorContext`
> > > > > > > > have to be a public
> > > > > > > > > > > > API? It
> > > > > > > > > > > > > > > > > > > > > > > > seems to
> > > > > > > > > > > > > > > > > > > > > > > > > > me
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > that this can be
> > completely
> > > > > > > > abstracted away from user
> > > > > > > > > > > > > > > > > > > > > > interfaces
> > > > > > > > > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > internal class, and
> > we call
> > > > > > the
> > > > > > > > `setKey` before calling
> > > > > > > > > > > > > > > > > > > > > > > > > > > user-instantiated
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > `process` function,
> > and then
> > > > > > in
> > > > > > > > its overridden
> > > > > > > > > > > > `forward` it
> > > > > > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > > > > > > > > check
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > if the key changes
> > or not.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) Related to 1)
> > above, in
> > > > > > the
> > > > > > > > past the rationale for
> > > > > > > > > > > > > > > > > > > > > > enforcing
> > > > > > > > > > > > > > > > > > > > > > > > it at
> > > > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > interface layer
> > rather than
> > > > > > do
> > > > > > > > runtime checks is that
> > > > > > > > > > > > it is
> > > > > > > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > > > > > > > > > efficient.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure how
> much
> > > > > > overhead
> > > > > > > > it may incur to check if
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > change: if it is
> > just a
> > > > > > > > reference equality check maybe
> > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > okay.
> > > > > > > > > > > > > > > > > > > > > > > > > > > What's
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > your take on this?
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 8, 2022
> > at 5:17
> > > > > > AM
> > > > > > > > Jorge Esteban Quilcate
> > > > > > > > > > > > Otoya
> > > > > > > > > > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > quilcate.jo...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dev team,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start
> > a new
> > > > > > > > discussion thread on Kafka
> > > > > > > > > > > > Streams
> > > > > > > > > > > > > > > > > > > > > > > > KIP-820:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This KIP is aimed
> > to extend
> > > > > > > > the current
> > > > > > > > > > > > `KStream#process`
> > > > > > > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > > > > return
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > output values
> that
> > could be
> > > > > > > > chained across the
> > > > > > > > > > > > topology,
> > > > > > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > > > > > well as
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > introducing a new
> > > > > > > > `KStream#processValues` to use
> > > > > > > > > > > > processor
> > > > > > > > > > > > > > > > > > > > > > > > while
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > validating
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > keys haven't
> > change and
> > > > > > > > repartition is not required.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Looking forward
> to
> > your
> > > > > > > > feedback.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jorge.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> >
> >
>


-- 
-- Guozhang

Reply via email to