Thanks for the followup Nick!

This is definitely one of those things that feels like it should be easy
and useful for everyone, but "the devil is in the details" as we always
say. It'll be good to have this thread and the discussion so far should
anyone want to attempt it in the future or need something similar

On Thu, Mar 28, 2024 at 6:03 AM Nick Telford <nick.telf...@gmail.com> wrote:

> Hi folks,
>
> Sorry I haven't got back to you until now.
>
> It's become clear that I hadn't anticipated a significant number of
> technical challenges that this KIP presents. I think expecting users to
> understand the ramifications on aggregations, joins and windowing
> ultimately kills it: it only becomes a problem under *specific*
> combinations of operations, and those problems would manifest in ways that
> might be difficult for users to detect, let alone diagnose.
>
> I think it's best to abandon this KIP, at least for now. If anyone else
> sees a use for and path forwards for it, feel free to pick it up.
>
> Since I'm abandoning the KIP, I won't update the Motivation section. But I
> will provide a bit of background here on why I originally suggested it, in
> case you're interested:
>
> In my organisation, we don't have schemas for *any* of our data in Kafka.
> Consequently, one of the biggest causes of downtime in our applications are
> "bad" records being written by Producers. We integrate with a lot of
> third-party APIs, and have Producers that just push that data straight to
> Kafka with very little validation. I've lost count of the number of times
> my application has been crashed by a deserialization exception because we
> received a record that looks like '{"error": "Bad gatetway"}' or similar,
> instead of the actual payload we expect.
>
> The difficulty is we can't just use CONTINUE to discard these messages,
> because we also sometimes get deserialization exceptions caused by an
> upstream schema change that is incompatible with the expectations of our
> app. In these cases, we don't want to discard records (which are
> technically valid), but instead need to adjust our application to be
> compatible with the new schema, before processing them.
>
> Crucially, we use a monolithic app, with more than 45 sub-topologies, so
> crashing the entire app just because of one bad record causes downtime on
> potentially unrelated sub-topologies.
>
> This was the motivation for this KIP, which would have enabled users to
> make a decision on what to do about a bad message, *without taking down the
> entire application*.
>
> Obviously, the *correct* solution to this problem is to introduce schemas
> on our topics and have our Producers correctly validate records before
> writing them to the cluster. This is ultimately the solution I am going to
> pursue in lieu of this KIP.
>
> I still think this KIP could have been useful for dealing with an
> incompatible upstream schema change; by pausing only the sub-topologies
> that are affected by the schema change, while leaving others to continue to
> run while the user deploys a fix. However, in practice I think few users
> have monolithic apps like ours, and most instead de-couple unrelated topics
> via different apps, which reduces the impact of incompatible upstream
> schema changes.
>
> Thanks for your reviews and feedback, I've learned a lot, as always; this
> time, mostly about how, when authoring a KIP,  I should always ask myself:
> "yes, but what about timestamp ordering?" :-D
>
> Nick
>
> On Thu, 14 Mar 2024 at 03:27, Sophie Blee-Goldman <sop...@responsive.dev>
> wrote:
>
> > >
> > > Well, the KIP mentions the ability to either re-try the record (eg,
> > > after applying some external fix that would allow Kafka Streams to now
> > > deserialize the record now) or to skip it by advancing the offset.
> >
> >
> > That's fair -- you're definitely right that what's described in the KIP
> > document
> > right now would not be practical. I just wanted to clarify that this
> > doesn't
> > mean the feature as a whole is impractical, but certainly we'd want to
> > update the proposal to remove the line about resetting offsets via
> external
> > tool and come up with a more concrete approach, and perhaps  describe
> > it in more detail.
> >
> > That's  probably not worth getting into until/unless we decide whether to
> > go forward with this feature in the first place. I'll let Nick reflect on
> > the
> > motivation and your other comments and then decide whether he still
> > wants to pursue it.
> >
> > To Nick: if you want to go through with this KIP and can expand on the
> > motivation so that we understand it better, I'd be happy to help work
> > out the details. For now I'll just wait for your decision
> >
> > On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > Yes, about the "drop records" case. It's a very common scenario to have
> > > a repartition step before a windowed aggregation or a join with
> > > grace-period.
> > >
> > >
> > > About "add feature vs guard users": it's always a tricky question and
> > > tradeoff. For this particular KIP, I personally think we should opt to
> > > not add the feature but guard the users, as I don't see too much value
> > > compared to the complexity and "traps" it adds. -- It's of course just
> > > my personal opinion, and if there is a asked from many users to add
> this
> > > feature, I would not push back further. As mentioned in my previous
> > > reply, I don't fully understand the motivation yet; maybe Nick can
> > > provide more context on it.
> > >
> > >
> > > > In other words, opting for the PAUSE option would simply stall the
> > > > task, and upon #resume it would just be discarding that record and
> then
> > > > continuing on with processing
> > >
> > > Well, the KIP mentions the ability to either re-try the record (eg,
> > > after applying some external fix that would allow Kafka Streams to now
> > > deserialize the record now) or to skip it by advancing the offset. But
> > > to do this, we need to extend the `resume()` callback to pass in this
> > > information, making the whole setup and usage of this feature more
> > > complex, as one needs to so more upfront instrumentation of their
> custom
> > > code. -- It's just a technical thing we need to consider if we want to
> > > move forward, and the KIP should not say "advancing the consumer
> > > offsets, either via an external tool" because this cannot work. Just
> > > pointing out incorrect technical assumption, not disregarding that it
> > > can be done.
> > >
> > >
> > > About committing: yes, I agree to all what you say, and again it was
> not
> > > meant as concern, but just as honest questions about some technical
> > > details. I think it would be good to consider there trade-offs and
> > > explain in the KIP why we want to do what. That's all.
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote:
> > > >>
> > > >>   I see way too many food-guns and complications that can be
> > introduced.
> > > >
> > > >
> > > > What is a "food-gun"?? I'm picturing like a spud rifle/potato gun
> but I
> > > > don't think that's what you meant hahaha
> > > >
> > > > I don't feel super strongly one way or another, but I have a few
> > > questions
> > > > & corrections about some of these complaints/concerns:
> > > >
> > > > If one task
> > > >> pauses but other keep running, we keep advancing stream-time
> > downstream,
> > > >> and thus when the task would resume later, there is a very high
> > > >> probability that records are dropped as window got already closed.
> > > >
> > > > Just to make sure I/everyone understand what you're getting at here,
> > you
> > > > would be
> > > > referring to the case of a stateful operation downstream of a
> > > key-changing
> > > > operation
> > > > which is in turn downstream of the  "paused" task -- ie with a
> > > repartition
> > > > separating
> > > > the paused task and the task with a windowed aggregation? Each task
> has
> > > its
> > > > own
> > > > view of stream-time (technically each processor within a task) so the
> > > only
> > > > way that
> > > > delaying one task and not another would affect which records get
> > dropped
> > > is
> > > > if those
> > > > two tasks are rekeyed and the repartitioning results in their outputs
> > > being
> > > > mixed -- yes?
> > > >
> > > > Anyways I think you make a good case for why pausing a single task --
> > or
> > > > even an entire
> > > > instance if others are allowed to continue running -- might make it
> too
> > > > easy for users to
> > > > shoot themselves in the foot without understanding the full
> > > ramifications.
> > > > Of course, there
> > > > are already a million ways for users to screw up their app if
> > configured
> > > or
> > > > operated incorrectly,
> > > > and we shouldn't necessarily kill a feature just because some people
> > > might
> > > > use it when they
> > > > shouldn't. Why can't we just document that this feature should not be
> > > used
> > > > with applications
> > > > that include time-sensitive operators?
> > > >
> > > > I also feel like you dismissed the "skip record case" somewhat too
> > > easily:
> > > >
> > > >> For the "skip record case", it's also not possible to skip over an
> > > >> offset from outside while the application is running....
> > > >
> > > >
> > > > True, you can't advance the offset from outside the app, but I don't
> > see
> > > why
> > > > you would want to, much less why you should need to for this to work.
> > > > Surely the best way to implement this case would just be for the
> > #resume
> > > > API to behave, and work, exactly the same as the handler's CONTINUE
> > > > option? In other words, opting for the PAUSE option would simply
> stall
> > > the
> > > > task, and upon #resume it would just be discarding that record and
> then
> > > > continuing on with processing (or even committing the offset
> > immediately
> > > > after
> > > > it, perhaps even asynchronously since it presumably doesn't matter if
> > it
> > > > doesn't succeed and the record is picked up again by accident -- as
> > long
> > > as
> > > >   that doesn't happen repeatedly in an infinite loop, which I don't
> see
> > > why
> > > > it would.)
> > > >
> > > > On the subject of committing...
> > > >
> > > > Other questions: if a task would be paused, would we commit the
> current
> > > >> offset? What happens if we re-balance? Would we just lose the
> "pause"
> > > >> state, and hit the same error again and just pause again?
> > > >
> > > >
> > > > I was imagining that we would either just wait without committing, or
> > > > perhaps
> > > > even commit everything up to -- but not including -- the "bad" record
> > > when
> > > > PAUSE is triggered. Again, if we rebalance and "lose the pause" then
> > > > we'll just attempt to process it again, fail, and end up back in
> PAUSE.
> > > This
> > > > is no different than how successful processing works, no? Who cares
> if
> > a
> > > > rebalance happens to strike and causes it to be PAUSED again?
> > > >
> > > > All in all, I feel like these concerns are all essentially "true",
> but
> > to
> > > > me they
> > > > just seem like implementation or design decisions and none of them
> > strike
> > > > them as posing an unsolvable problem for this feature. But maybe I'm
> > > > just lacking in imagination...
> > > >
> > > > Thoughts?
> > > >
> > > >
> > > > On Fri, Mar 8, 2024 at 5:30 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > > >
> > > >> Hey Nick,
> > > >>
> > > >> I am sorry that I have to say that I am not a fan of this KIP. I see
> > way
> > > >> too many food-guns and complications that can be introduced.
> > > >>
> > > >> I am also not sure if I understand the motivation. You say, CONTINUE
> > and
> > > >> FAIL is not good enough, but don't describe in detail why? If we
> > > >> understand the actual problem better, it might also get clear how
> > > >> task-pausing would help to address the problem.
> > > >>
> > > >>
> > > >> The main problem I see, as already mentioned by Sophie, it's about
> > time
> > > >> synchronization. However, its not limited to joins, but affect all
> > > >> time-based operations, ie, also all windowed aggregations. If one
> task
> > > >> pauses but other keep running, we keep advancing stream-time
> > downstream,
> > > >> and thus when the task would resume later, there is a very high
> > > >> probability that records are dropped as window got already closed.
> > > >>
> > > >> For the runtime itself, we also cannot really do a cascading
> > downstream
> > > >> pause, because the runtime does not know anything about the
> semantics
> > of
> > > >> operators. We don't know if we execute a DSL operator or a PAPI
> > > >> operator. (We could maybe track all downsteam tasks independent of
> > > >> semantics, but in the end it might just imply we could also just
> pause
> > > >> all task...)
> > > >>
> > > >> For the "skip record case", it's also not possible to skip over an
> > > >> offset from outside while the application is running. The offset in
> > > >> question is cached inside the consumer and the consumer would not go
> > > >> back to Kafka to re-read the offset (only when a partitions is
> > > >> re-assigned to a new consumer, the consumer would fetch the offset
> > once
> > > >> to init itself). -- But even if the consumer would go back to read
> the
> > > >> offset, as long as the partition is assigned to a member of the
> group,
> > > >> it's not even possible to commit a new offset using some external
> > tool.
> > > >> Only member of the group are allowed to commit offset, and all tools
> > > >> that allow to manipulate offsets require that the corresponding
> > > >> application is stopped, and that the consumer group is empty (and
> the
> > > >> tool will join the consumer group as only member and commit
> offsets).
> > > >>
> > > >> Of course, we could pause all tasks, but that's kind similar to shut
> > > >> down? I agree though, that `FAIL` is rather harsh, and it could be a
> > > >> good thing to introduce a graceful `SHUTDOWN` option (similar to
> what
> > we
> > > >> have via the uncaught exception handler)?
> > > >>
> > > >> If we pause all tasks we would of course need to do this not just
> for
> > a
> > > >> single instance, but for all... We do already have
> > > >> `KafkaStreams#pause()` but it does not include a application wide
> > pause,
> > > >> but only an instance pause -- the assumption of this feature was,
> that
> > > >> an external pause signal would be send to all instances at the same
> > > >> time. Building it into KS was not done as potentially to
> > complicated...
> > > >>
> > > >> Other questions: if a task would be paused, would we commit the
> > current
> > > >> offset? What happens if we re-balance? Would we just lose the
> "pause"
> > > >> state, and hit the same error again and just pause again?
> > > >>
> > > >>
> > > >> Right now, I would rather propose to discard this KIP (or change the
> > > >> scope drastically to add a "global pause" and/or "global shutdown"
> > > >> option). Of course, if you can provide convincing answers, I am
> happy
> > to
> > > >> move forward with per-task pausing. But my gut feeling is, that even
> > if
> > > >> we would find technically sound solutions, it would be way too
> > > >> complicated to use (and maybe also to implement inside KS) for too
> > > >> little benefits.
> > > >>
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >> On 10/26/23 5:57 AM, Nick Telford wrote:
> > > >>> 1.
> > > >>> Woops! I've fixed that now. Thanks for catching that.
> > > >>>
> > > >>> 2.
> > > >>> I agree, I'll remove the LogAndPause handler so it's clear this is
> an
> > > >>> advanced feature. I'll also add some documentation to
> > > >>> DeserializationExceptionResponse#SUSPEND that explains the care
> users
> > > >>> should approach it with.
> > > >>>
> > > >>> 3a.
> > > >>> This is interesting. My main concern is that there may be
> situations
> > > >> where
> > > >>> skipping a single bad record is not the necessary solution, but the
> > > Task
> > > >>> should still be resumed without restarting the application. For
> > > example,
> > > >> if
> > > >>> there are several bad records in a row that should be skipped.
> > > >>>
> > > >>> 3b.
> > > >>> Additionally, a Task may have multiple input topics, so we'd need
> > some
> > > >> way
> > > >>> to indicate which record to skip.
> > > >>>
> > > >>> These can probably be resolved by something like
> > skipAndContinue(TaskId
> > > >>> task, String topic, int recordsToSkip) or even
> skipAndContinue(TaskId
> > > >> task,
> > > >>> Map<String, Integer> recordsToSkipByTopic)?
> > > >>>
> > > >>> 4.
> > > >>> Related to 2: I was thinking that users implementing their own
> > handler
> > > >> may
> > > >>> want to be able to determine which Processors (i.e. which
> > > >> Subtopology/task
> > > >>> group) are being affected, so they can programmatically make a
> > decision
> > > >> on
> > > >>> whether it's safe to PAUSE. ProcessorContext, which is already a
> > > >> parameter
> > > >>> to DeserializationExceptionHandler provides the TaskId of the
> failed
> > > >> Task,
> > > >>> but doesn't provide metadata on the Processors that Task executes.
> > > >>>
> > > >>> Since TaskIds are non-deterministic (they can change when you
> modify
> > > your
> > > >>> topology, with no influence over how they're assigned), a user
> cannot
> > > use
> > > >>> TaskId alone to determine which Processors would be affected.
> > > >>>
> > > >>> What do you think would be the best way to provide this information
> > to
> > > >>> exception handlers? I was originally thinking that users could
> > > >> instantiate
> > > >>> the handler themselves and provide a TopologyDescription (via
> > > >>> KafkaStreams#describe) in the constructor, but it looks like
> configs
> > of
> > > >>> type Class cannot accept an already instantiated instance, and
> > there's
> > > no
> > > >>> other way to inject information like that.
> > > >>>
> > > >>> Perhaps we could add something to ProcessorContext that contains
> > > details
> > > >> on
> > > >>> the sub-topology being executed?
> > > >>>
> > > >>> Regards,
> > > >>> Nick
> > > >>>
> > > >>> On Thu, 26 Oct 2023 at 01:24, Sophie Blee-Goldman <
> > > sop...@responsive.dev
> > > >>>
> > > >>> wrote:
> > > >>>
> > > >>>> 1. Makes sense to me! Can you just update the name of the
> > > >>>> DeserializationHandlerResponse enum from SUSPEND to PAUSE so
> > > >>>> we're consistent with the wording?
> > > >>>>
> > > >>>> The drawback here would be that custom stateful Processors
> > > >>>>> might also be impacted, but there'd be no way to know if they're
> > safe
> > > >> to
> > > >>>>> not pause.
> > > >>>>>
> > > >>>> 2. This is a really good point -- maybe this is just a case where
> we
> > > >> have
> > > >>>> to trust
> > > >>>> in the user not to accidentally screw themselves over. As long as
> we
> > > >>>> provide
> > > >>>> sufficient information for them to decide when it is/isn't safe to
> > > >> pause a
> > > >>>> task,
> > > >>>> I would be ok with just documenting the dangers of indiscriminate
> > use
> > > of
> > > >>>> this
> > > >>>> feature, and hope that everyone reads the warning.
> > > >>>>
> > > >>>> Given the above, I have one suggestion: what if we only add the
> > PAUSE
> > > >> enum
> > > >>>> in this KIP, and don't include an OOTB
> > DeserializationExceptionHandler
> > > >> that
> > > >>>> implements this? I see this as addressing two concerns:
> > > >>>> 2a. It would make it clear that this is an advanced feature and
> > should
> > > >> be
> > > >>>> given
> > > >>>> careful consideration, rather than just plugging in a config
> value.
> > > >>>> 2b. It forces the user to implement the handler themselves, which
> > > gives
> > > >>>> them
> > > >>>> an opportunity to check on which task it is that's hitting the
> error
> > > and
> > > >>>> then
> > > >>>> make a conscious decision as to whether it is safe to pause or
> not.
> > In
> > > >> the
> > > >>>> end,
> > > >>>> it's really impossible for us to know what is/is not safe to
> pause,
> > so
> > > >> the
> > > >>>> more
> > > >>>> responsibility we can put on the user in this case, the better.
> > > >>>>
> > > >>>> 3. It sounds like the general recovery workflow would be to either
> > > >> resolve
> > > >>>> the
> > > >>>> issue somehow (presumably by fixing an issue in the deserializer?)
> > and
> > > >>>> restart the application -- in which case no further manual
> > > intervention
> > > >> is
> > > >>>> required -- or else to determine the record is unprocessable and
> > > should
> > > >> be
> > > >>>> skipped, in which case the user needs to somehow increment the
> > offset
> > > >>>> and then resume the task.
> > > >>>>
> > > >>>> It's a bit awkward to ask people to use the command line tools to
> > > >> manually
> > > >>>> wind the offset forward. More importantly, there are likely many
> > > >> operators
> > > >>>> who
> > > >>>> don't have the permissions necessary to use the command line tools
> > for
> > > >>>> this kind of thing, and they would be pretty much out of luck in
> > that
> > > >> case.
> > > >>>>
> > > >>>> On the flipside, it seems like if the user ever wants to resume
> the
> > > task
> > > >>>> without restarting, they will need to skip over the bad record. I
> > > think
> > > >> we
> > > >>>> can
> > > >>>> make the feature considerably more ergonomic by modifying the
> > behavior
> > > >>>> of the #resume method so that it always skips over the bad record.
> > > This
> > > >>>> will probably be the easiest to implement anyways, as it is
> > > effectively
> > > >> the
> > > >>>> same as the CONTINUE option internally, but gives the user time to
> > > >>>> decide if they really do want to CONTINUE or not
> > > >>>>
> > > >>>> Not sure if we would want to rename the #resume method in that
> case
> > to
> > > >>>> make this more clear, or if javadocs would be sufficient...maybe
> > > >>>> something like #skipRecordAndContinue?
> > > >>>>
> > > >>>> On Tue, Oct 24, 2023 at 6:54 AM Nick Telford <
> > nick.telf...@gmail.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi Sophie,
> > > >>>>>
> > > >>>>> Thanks for the review!
> > > >>>>>
> > > >>>>> 1-3.
> > > >>>>> I had a feeling this was the case. I'm thinking of adding a
> PAUSED
> > > >> state
> > > >>>>> with the following valid transitions:
> > > >>>>>
> > > >>>>>      - RUNNING -> PAUSED
> > > >>>>>      - PAUSED -> RUNNING
> > > >>>>>      - PAUSED -> SUSPENDED
> > > >>>>>
> > > >>>>> The advantage of a dedicated State is it should make testing
> easier
> > > and
> > > >>>>> also reduce the potential for introducing bugs into the existing
> > Task
> > > >>>>> states.
> > > >>>>>
> > > >>>>> While I appreciate that the engine is being revised, I think I'll
> > > still
> > > >>>>> pursue this actively instead of waiting, as it addresses some
> > > problems
> > > >> my
> > > >>>>> team is having right now. If the KIP is accepted, then I suspect
> > that
> > > >>>> this
> > > >>>>> feature would still be desirable with the new streams engine, so
> > any
> > > >> new
> > > >>>>> Task state would likely want to be mirrored in the new engine,
> and
> > > the
> > > >>>> high
> > > >>>>> level design is unlikely to change.
> > > >>>>>
> > > >>>>> 4a.
> > > >>>>> This is an excellent point I hadn't considered. Correct me if I'm
> > > >> wrong,
> > > >>>>> but the only joins that this would impact are Stream-Stream and
> > > >>>>> Stream-Table joins? Table-Table joins should be safe, because the
> > > join
> > > >> is
> > > >>>>> commutative, so a delayed record on one side should just cause
> its
> > > >> output
> > > >>>>> record to be delayed, but not lost.
> > > >>>>>
> > > >>>>> 4b.
> > > >>>>> If we can enumerate only the node types that are impacted by this
> > > (i.e.
> > > >>>>> Stream-Stream and Stream-Table joins), then perhaps we could
> > restrict
> > > >> it
> > > >>>>> such that it only pauses dependent Tasks if there's a
> > > >> Stream-Stream/Table
> > > >>>>> join involved? The drawback here would be that custom stateful
> > > >> Processors
> > > >>>>> might also be impacted, but there'd be no way to know if they're
> > safe
> > > >> to
> > > >>>>> not pause.
> > > >>>>>
> > > >>>>> 4c.
> > > >>>>> Regardless, I like this idea, but I have very little knowledge
> > about
> > > >>>> making
> > > >>>>> changes to the rebalance/network protocol. It looks like this
> could
> > > be
> > > >>>>> added via StreamsPartitionAssignor#subscriptionUserData? I might
> > need
> > > >>>> some
> > > >>>>> help designing this aspect of this KIP.
> > > >>>>>
> > > >>>>> Regards,
> > > >>>>> Nick
> > > >>>>>
> > > >>>>> On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman <
> > > >> sop...@responsive.dev
> > > >>>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hey Nick,
> > > >>>>>>
> > > >>>>>> A few high-level thoughts:
> > > >>>>>>
> > > >>>>>> 1. We definitely don't want to piggyback on the SUSPENDED task
> > > state,
> > > >>>> as
> > > >>>>>> this is currently more like an intermediate state that a task
> > passes
> > > >>>>>> through as it's being closed/migrated elsewhere, it doesn't
> really
> > > >> mean
> > > >>>>>> that a task is "suspended" and there's no logic to suspend
> > > processing
> > > >>>> on
> > > >>>>>> it. What you want is probably closer in spirit to the concept
> of a
> > > >>>> paused
> > > >>>>>> "named topology", where we basically freeze processing on a
> > specific
> > > >>>> task
> > > >>>>>> (or set of tasks).
> > > >>>>>> 2. More importantly however, the SUSPENDED state was only ever
> > > needed
> > > >>>> to
> > > >>>>>> support efficient eager rebalancing, and we plan to remove the
> > eager
> > > >>>>>> rebalancing protocol from Streams entirely in the near future.
> And
> > > >>>>>> unfortunately, the named topologies feature was never fully
> > > >> implemented
> > > >>>>> and
> > > >>>>>> will probably be ripped out sometime soon as well.
> > > >>>>>> 3. In short, to go this route, you'd probably need to implement
> a
> > > >>>> PAUSED
> > > >>>>>> state from scratch. This wouldn't be impossible, but we are
> > planning
> > > >> to
> > > >>>>>> basically revamp the entire thread model and decouple the
> consumer
> > > >>>>>> (potentially including the deserialization step) from the
> > processing
> > > >>>>>> threads. Much as I love the idea of this feature, it might not
> > make
> > > a
> > > >>>> lot
> > > >>>>>> of sense to spend time implementing right now when much of that
> > work
> > > >>>>> could
> > > >>>>>> be scrapped in the mid-term future. We don't have a timeline for
> > > this,
> > > >>>>>> however, so I don't think this should discourage you if the
> > feature
> > > >>>> seems
> > > >>>>>> worth it, just want to give you a sense of the upcoming roadmap.
> > > >>>>>> 4. As for the feature itself, my only concern is that this feels
> > > like
> > > >> a
> > > >>>>>> very advanced feature but it would be easy for new users to
> > > >>>> accidentally
> > > >>>>>> abuse it and get their application in trouble. Specifically I'm
> > > >> worried
> > > >>>>>> about how this could be harmful to applications for which some
> > > degree
> > > >>>> of
> > > >>>>>> synchronization is required, such as a join. Correct join
> > semantics
> > > >>>> rely
> > > >>>>>> heavily on receiving records from both sides of the join and
> > > carefully
> > > >>>>>> selecting the next one to process based on timestamp. Imagine
> if a
> > > >>>>>> DeserializationException occurs upstream of a repartition
> feeding
> > > into
> > > >>>>> one
> > > >>>>>> side of a join (but not the other) and the user opts to PAUSE
> this
> > > >>>> task.
> > > >>>>> If
> > > >>>>>> the join continues  as usual it could lead to missed or
> incorrect
> > > >>>> results
> > > >>>>>> when processing is enforced with no records present on one side
> of
> > > the
> > > >>>>> join
> > > >>>>>> but usual traffic flowing through the other. Maybe we could
> > somehow
> > > >>>>> signal
> > > >>>>>> to also PAUSE all downstream/dependent tasks? Should be able to
> > add
> > > >>>> this
> > > >>>>>> information to the subscription metadata and send to all clients
> > > via a
> > > >>>>>> rebalance. There might be better options I'm not seeing. Or we
> > could
> > > >>>> just
> > > >>>>>> decide to trust the users not to shoot themselves in the foot --
> > as
> > > >>>> long
> > > >>>>> as
> > > >>>>>> we write a clear warning in the javadocs this might be fine
> > > >>>>>>
> > > >>>>>> Thanks for all the great KIPs!
> > > >>>>>>
> > > >>>>>> On Thu, Oct 12, 2023 at 9:51 AM Nick Telford <
> > > nick.telf...@gmail.com>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi everyone,
> > > >>>>>>>
> > > >>>>>>> This is a Streams KIP to add a new
> > DeserializationHandlerResponse,
> > > >>>>>>> "SUSPEND", that suspends the failing Task but continues to
> > process
> > > >>>>> other
> > > >>>>>>> Tasks normally.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException
> > > >>>>>>>
> > > >>>>>>> I'm not yet completely convinced that this is practical, as I
> > > suspect
> > > >>>>> it
> > > >>>>>>> might be abusing the SUSPENDED Task.State for something it was
> > not
> > > >>>>>> designed
> > > >>>>>>> for. The intent is to pause an active Task *without*
> re-assigning
> > > it
> > > >>>> to
> > > >>>>>>> another instance, which causes cascading failures when the FAIL
> > > >>>>>>> DeserializationHandlerResponse is used.
> > > >>>>>>>
> > > >>>>>>> Let me know what you think!
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>> Nick
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> >
>

Reply via email to