>
> Well, the KIP mentions the ability to either re-try the record (eg,
> after applying some external fix that would allow Kafka Streams to now
> deserialize the record now) or to skip it by advancing the offset.


That's fair -- you're definitely right that what's described in the KIP
document
right now would not be practical. I just wanted to clarify that this
doesn't
mean the feature as a whole is impractical, but certainly we'd want to
update the proposal to remove the line about resetting offsets via external
tool and come up with a more concrete approach, and perhaps  describe
it in more detail.

That's  probably not worth getting into until/unless we decide whether to
go forward with this feature in the first place. I'll let Nick reflect on
the
motivation and your other comments and then decide whether he still
wants to pursue it.

To Nick: if you want to go through with this KIP and can expand on the
motivation so that we understand it better, I'd be happy to help work
out the details. For now I'll just wait for your decision

On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax <mj...@apache.org> wrote:

> Yes, about the "drop records" case. It's a very common scenario to have
> a repartition step before a windowed aggregation or a join with
> grace-period.
>
>
> About "add feature vs guard users": it's always a tricky question and
> tradeoff. For this particular KIP, I personally think we should opt to
> not add the feature but guard the users, as I don't see too much value
> compared to the complexity and "traps" it adds. -- It's of course just
> my personal opinion, and if there is a asked from many users to add this
> feature, I would not push back further. As mentioned in my previous
> reply, I don't fully understand the motivation yet; maybe Nick can
> provide more context on it.
>
>
> > In other words, opting for the PAUSE option would simply stall the
> > task, and upon #resume it would just be discarding that record and then
> > continuing on with processing
>
> Well, the KIP mentions the ability to either re-try the record (eg,
> after applying some external fix that would allow Kafka Streams to now
> deserialize the record now) or to skip it by advancing the offset. But
> to do this, we need to extend the `resume()` callback to pass in this
> information, making the whole setup and usage of this feature more
> complex, as one needs to so more upfront instrumentation of their custom
> code. -- It's just a technical thing we need to consider if we want to
> move forward, and the KIP should not say "advancing the consumer
> offsets, either via an external tool" because this cannot work. Just
> pointing out incorrect technical assumption, not disregarding that it
> can be done.
>
>
> About committing: yes, I agree to all what you say, and again it was not
> meant as concern, but just as honest questions about some technical
> details. I think it would be good to consider there trade-offs and
> explain in the KIP why we want to do what. That's all.
>
>
>
> -Matthias
>
> On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote:
> >>
> >>   I see way too many food-guns and complications that can be introduced.
> >
> >
> > What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I
> > don't think that's what you meant hahaha
> >
> > I don't feel super strongly one way or another, but I have a few
> questions
> > & corrections about some of these complaints/concerns:
> >
> > If one task
> >> pauses but other keep running, we keep advancing stream-time downstream,
> >> and thus when the task would resume later, there is a very high
> >> probability that records are dropped as window got already closed.
> >
> > Just to make sure I/everyone understand what you're getting at here, you
> > would be
> > referring to the case of a stateful operation downstream of a
> key-changing
> > operation
> > which is in turn downstream of the  "paused" task -- ie with a
> repartition
> > separating
> > the paused task and the task with a windowed aggregation? Each task has
> its
> > own
> > view of stream-time (technically each processor within a task) so the
> only
> > way that
> > delaying one task and not another would affect which records get dropped
> is
> > if those
> > two tasks are rekeyed and the repartitioning results in their outputs
> being
> > mixed -- yes?
> >
> > Anyways I think you make a good case for why pausing a single task -- or
> > even an entire
> > instance if others are allowed to continue running -- might make it too
> > easy for users to
> > shoot themselves in the foot without understanding the full
> ramifications.
> > Of course, there
> > are already a million ways for users to screw up their app if configured
> or
> > operated incorrectly,
> > and we shouldn't necessarily kill a feature just because some people
> might
> > use it when they
> > shouldn't. Why can't we just document that this feature should not be
> used
> > with applications
> > that include time-sensitive operators?
> >
> > I also feel like you dismissed the "skip record case" somewhat too
> easily:
> >
> >> For the "skip record case", it's also not possible to skip over an
> >> offset from outside while the application is running....
> >
> >
> > True, you can't advance the offset from outside the app, but I don't see
> why
> > you would want to, much less why you should need to for this to work.
> > Surely the best way to implement this case would just be for the #resume
> > API to behave, and work, exactly the same as the handler's CONTINUE
> > option? In other words, opting for the PAUSE option would simply stall
> the
> > task, and upon #resume it would just be discarding that record and then
> > continuing on with processing (or even committing the offset immediately
> > after
> > it, perhaps even asynchronously since it presumably doesn't matter if it
> > doesn't succeed and the record is picked up again by accident -- as long
> as
> >   that doesn't happen repeatedly in an infinite loop, which I don't see
> why
> > it would.)
> >
> > On the subject of committing...
> >
> > Other questions: if a task would be paused, would we commit the current
> >> offset? What happens if we re-balance? Would we just lose the "pause"
> >> state, and hit the same error again and just pause again?
> >
> >
> > I was imagining that we would either just wait without committing, or
> > perhaps
> > even commit everything up to -- but not including -- the "bad" record
> when
> > PAUSE is triggered. Again, if we rebalance and "lose the pause" then
> > we'll just attempt to process it again, fail, and end up back in PAUSE.
> This
> > is no different than how successful processing works, no? Who cares if a
> > rebalance happens to strike and causes it to be PAUSED again?
> >
> > All in all, I feel like these concerns are all essentially "true", but to
> > me they
> > just seem like implementation or design decisions and none of them strike
> > them as posing an unsolvable problem for this feature. But maybe I'm
> > just lacking in imagination...
> >
> > Thoughts?
> >
> >
> > On Fri, Mar 8, 2024 at 5:30 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> Hey Nick,
> >>
> >> I am sorry that I have to say that I am not a fan of this KIP. I see way
> >> too many food-guns and complications that can be introduced.
> >>
> >> I am also not sure if I understand the motivation. You say, CONTINUE and
> >> FAIL is not good enough, but don't describe in detail why? If we
> >> understand the actual problem better, it might also get clear how
> >> task-pausing would help to address the problem.
> >>
> >>
> >> The main problem I see, as already mentioned by Sophie, it's about time
> >> synchronization. However, its not limited to joins, but affect all
> >> time-based operations, ie, also all windowed aggregations. If one task
> >> pauses but other keep running, we keep advancing stream-time downstream,
> >> and thus when the task would resume later, there is a very high
> >> probability that records are dropped as window got already closed.
> >>
> >> For the runtime itself, we also cannot really do a cascading downstream
> >> pause, because the runtime does not know anything about the semantics of
> >> operators. We don't know if we execute a DSL operator or a PAPI
> >> operator. (We could maybe track all downsteam tasks independent of
> >> semantics, but in the end it might just imply we could also just pause
> >> all task...)
> >>
> >> For the "skip record case", it's also not possible to skip over an
> >> offset from outside while the application is running. The offset in
> >> question is cached inside the consumer and the consumer would not go
> >> back to Kafka to re-read the offset (only when a partitions is
> >> re-assigned to a new consumer, the consumer would fetch the offset once
> >> to init itself). -- But even if the consumer would go back to read the
> >> offset, as long as the partition is assigned to a member of the group,
> >> it's not even possible to commit a new offset using some external tool.
> >> Only member of the group are allowed to commit offset, and all tools
> >> that allow to manipulate offsets require that the corresponding
> >> application is stopped, and that the consumer group is empty (and the
> >> tool will join the consumer group as only member and commit offsets).
> >>
> >> Of course, we could pause all tasks, but that's kind similar to shut
> >> down? I agree though, that `FAIL` is rather harsh, and it could be a
> >> good thing to introduce a graceful `SHUTDOWN` option (similar to what we
> >> have via the uncaught exception handler)?
> >>
> >> If we pause all tasks we would of course need to do this not just for a
> >> single instance, but for all... We do already have
> >> `KafkaStreams#pause()` but it does not include a application wide pause,
> >> but only an instance pause -- the assumption of this feature was, that
> >> an external pause signal would be send to all instances at the same
> >> time. Building it into KS was not done as potentially to complicated...
> >>
> >> Other questions: if a task would be paused, would we commit the current
> >> offset? What happens if we re-balance? Would we just lose the "pause"
> >> state, and hit the same error again and just pause again?
> >>
> >>
> >> Right now, I would rather propose to discard this KIP (or change the
> >> scope drastically to add a "global pause" and/or "global shutdown"
> >> option). Of course, if you can provide convincing answers, I am happy to
> >> move forward with per-task pausing. But my gut feeling is, that even if
> >> we would find technically sound solutions, it would be way too
> >> complicated to use (and maybe also to implement inside KS) for too
> >> little benefits.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 10/26/23 5:57 AM, Nick Telford wrote:
> >>> 1.
> >>> Woops! I've fixed that now. Thanks for catching that.
> >>>
> >>> 2.
> >>> I agree, I'll remove the LogAndPause handler so it's clear this is an
> >>> advanced feature. I'll also add some documentation to
> >>> DeserializationExceptionResponse#SUSPEND that explains the care users
> >>> should approach it with.
> >>>
> >>> 3a.
> >>> This is interesting. My main concern is that there may be situations
> >> where
> >>> skipping a single bad record is not the necessary solution, but the
> Task
> >>> should still be resumed without restarting the application. For
> example,
> >> if
> >>> there are several bad records in a row that should be skipped.
> >>>
> >>> 3b.
> >>> Additionally, a Task may have multiple input topics, so we'd need some
> >> way
> >>> to indicate which record to skip.
> >>>
> >>> These can probably be resolved by something like skipAndContinue(TaskId
> >>> task, String topic, int recordsToSkip) or even skipAndContinue(TaskId
> >> task,
> >>> Map<String, Integer> recordsToSkipByTopic)?
> >>>
> >>> 4.
> >>> Related to 2: I was thinking that users implementing their own handler
> >> may
> >>> want to be able to determine which Processors (i.e. which
> >> Subtopology/task
> >>> group) are being affected, so they can programmatically make a decision
> >> on
> >>> whether it's safe to PAUSE. ProcessorContext, which is already a
> >> parameter
> >>> to DeserializationExceptionHandler provides the TaskId of the failed
> >> Task,
> >>> but doesn't provide metadata on the Processors that Task executes.
> >>>
> >>> Since TaskIds are non-deterministic (they can change when you modify
> your
> >>> topology, with no influence over how they're assigned), a user cannot
> use
> >>> TaskId alone to determine which Processors would be affected.
> >>>
> >>> What do you think would be the best way to provide this information to
> >>> exception handlers? I was originally thinking that users could
> >> instantiate
> >>> the handler themselves and provide a TopologyDescription (via
> >>> KafkaStreams#describe) in the constructor, but it looks like configs of
> >>> type Class cannot accept an already instantiated instance, and there's
> no
> >>> other way to inject information like that.
> >>>
> >>> Perhaps we could add something to ProcessorContext that contains
> details
> >> on
> >>> the sub-topology being executed?
> >>>
> >>> Regards,
> >>> Nick
> >>>
> >>> On Thu, 26 Oct 2023 at 01:24, Sophie Blee-Goldman <
> sop...@responsive.dev
> >>>
> >>> wrote:
> >>>
> >>>> 1. Makes sense to me! Can you just update the name of the
> >>>> DeserializationHandlerResponse enum from SUSPEND to PAUSE so
> >>>> we're consistent with the wording?
> >>>>
> >>>> The drawback here would be that custom stateful Processors
> >>>>> might also be impacted, but there'd be no way to know if they're safe
> >> to
> >>>>> not pause.
> >>>>>
> >>>> 2. This is a really good point -- maybe this is just a case where we
> >> have
> >>>> to trust
> >>>> in the user not to accidentally screw themselves over. As long as we
> >>>> provide
> >>>> sufficient information for them to decide when it is/isn't safe to
> >> pause a
> >>>> task,
> >>>> I would be ok with just documenting the dangers of indiscriminate use
> of
> >>>> this
> >>>> feature, and hope that everyone reads the warning.
> >>>>
> >>>> Given the above, I have one suggestion: what if we only add the PAUSE
> >> enum
> >>>> in this KIP, and don't include an OOTB DeserializationExceptionHandler
> >> that
> >>>> implements this? I see this as addressing two concerns:
> >>>> 2a. It would make it clear that this is an advanced feature and should
> >> be
> >>>> given
> >>>> careful consideration, rather than just plugging in a config value.
> >>>> 2b. It forces the user to implement the handler themselves, which
> gives
> >>>> them
> >>>> an opportunity to check on which task it is that's hitting the error
> and
> >>>> then
> >>>> make a conscious decision as to whether it is safe to pause or not. In
> >> the
> >>>> end,
> >>>> it's really impossible for us to know what is/is not safe to pause, so
> >> the
> >>>> more
> >>>> responsibility we can put on the user in this case, the better.
> >>>>
> >>>> 3. It sounds like the general recovery workflow would be to either
> >> resolve
> >>>> the
> >>>> issue somehow (presumably by fixing an issue in the deserializer?) and
> >>>> restart the application -- in which case no further manual
> intervention
> >> is
> >>>> required -- or else to determine the record is unprocessable and
> should
> >> be
> >>>> skipped, in which case the user needs to somehow increment the offset
> >>>> and then resume the task.
> >>>>
> >>>> It's a bit awkward to ask people to use the command line tools to
> >> manually
> >>>> wind the offset forward. More importantly, there are likely many
> >> operators
> >>>> who
> >>>> don't have the permissions necessary to use the command line tools for
> >>>> this kind of thing, and they would be pretty much out of luck in that
> >> case.
> >>>>
> >>>> On the flipside, it seems like if the user ever wants to resume the
> task
> >>>> without restarting, they will need to skip over the bad record. I
> think
> >> we
> >>>> can
> >>>> make the feature considerably more ergonomic by modifying the behavior
> >>>> of the #resume method so that it always skips over the bad record.
> This
> >>>> will probably be the easiest to implement anyways, as it is
> effectively
> >> the
> >>>> same as the CONTINUE option internally, but gives the user time to
> >>>> decide if they really do want to CONTINUE or not
> >>>>
> >>>> Not sure if we would want to rename the #resume method in that case to
> >>>> make this more clear, or if javadocs would be sufficient...maybe
> >>>> something like #skipRecordAndContinue?
> >>>>
> >>>> On Tue, Oct 24, 2023 at 6:54 AM Nick Telford <nick.telf...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Sophie,
> >>>>>
> >>>>> Thanks for the review!
> >>>>>
> >>>>> 1-3.
> >>>>> I had a feeling this was the case. I'm thinking of adding a PAUSED
> >> state
> >>>>> with the following valid transitions:
> >>>>>
> >>>>>      - RUNNING -> PAUSED
> >>>>>      - PAUSED -> RUNNING
> >>>>>      - PAUSED -> SUSPENDED
> >>>>>
> >>>>> The advantage of a dedicated State is it should make testing easier
> and
> >>>>> also reduce the potential for introducing bugs into the existing Task
> >>>>> states.
> >>>>>
> >>>>> While I appreciate that the engine is being revised, I think I'll
> still
> >>>>> pursue this actively instead of waiting, as it addresses some
> problems
> >> my
> >>>>> team is having right now. If the KIP is accepted, then I suspect that
> >>>> this
> >>>>> feature would still be desirable with the new streams engine, so any
> >> new
> >>>>> Task state would likely want to be mirrored in the new engine, and
> the
> >>>> high
> >>>>> level design is unlikely to change.
> >>>>>
> >>>>> 4a.
> >>>>> This is an excellent point I hadn't considered. Correct me if I'm
> >> wrong,
> >>>>> but the only joins that this would impact are Stream-Stream and
> >>>>> Stream-Table joins? Table-Table joins should be safe, because the
> join
> >> is
> >>>>> commutative, so a delayed record on one side should just cause its
> >> output
> >>>>> record to be delayed, but not lost.
> >>>>>
> >>>>> 4b.
> >>>>> If we can enumerate only the node types that are impacted by this
> (i.e.
> >>>>> Stream-Stream and Stream-Table joins), then perhaps we could restrict
> >> it
> >>>>> such that it only pauses dependent Tasks if there's a
> >> Stream-Stream/Table
> >>>>> join involved? The drawback here would be that custom stateful
> >> Processors
> >>>>> might also be impacted, but there'd be no way to know if they're safe
> >> to
> >>>>> not pause.
> >>>>>
> >>>>> 4c.
> >>>>> Regardless, I like this idea, but I have very little knowledge about
> >>>> making
> >>>>> changes to the rebalance/network protocol. It looks like this could
> be
> >>>>> added via StreamsPartitionAssignor#subscriptionUserData? I might need
> >>>> some
> >>>>> help designing this aspect of this KIP.
> >>>>>
> >>>>> Regards,
> >>>>> Nick
> >>>>>
> >>>>> On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman <
> >> sop...@responsive.dev
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hey Nick,
> >>>>>>
> >>>>>> A few high-level thoughts:
> >>>>>>
> >>>>>> 1. We definitely don't want to piggyback on the SUSPENDED task
> state,
> >>>> as
> >>>>>> this is currently more like an intermediate state that a task passes
> >>>>>> through as it's being closed/migrated elsewhere, it doesn't really
> >> mean
> >>>>>> that a task is "suspended" and there's no logic to suspend
> processing
> >>>> on
> >>>>>> it. What you want is probably closer in spirit to the concept of a
> >>>> paused
> >>>>>> "named topology", where we basically freeze processing on a specific
> >>>> task
> >>>>>> (or set of tasks).
> >>>>>> 2. More importantly however, the SUSPENDED state was only ever
> needed
> >>>> to
> >>>>>> support efficient eager rebalancing, and we plan to remove the eager
> >>>>>> rebalancing protocol from Streams entirely in the near future. And
> >>>>>> unfortunately, the named topologies feature was never fully
> >> implemented
> >>>>> and
> >>>>>> will probably be ripped out sometime soon as well.
> >>>>>> 3. In short, to go this route, you'd probably need to implement a
> >>>> PAUSED
> >>>>>> state from scratch. This wouldn't be impossible, but we are planning
> >> to
> >>>>>> basically revamp the entire thread model and decouple the consumer
> >>>>>> (potentially including the deserialization step) from the processing
> >>>>>> threads. Much as I love the idea of this feature, it might not make
> a
> >>>> lot
> >>>>>> of sense to spend time implementing right now when much of that work
> >>>>> could
> >>>>>> be scrapped in the mid-term future. We don't have a timeline for
> this,
> >>>>>> however, so I don't think this should discourage you if the feature
> >>>> seems
> >>>>>> worth it, just want to give you a sense of the upcoming roadmap.
> >>>>>> 4. As for the feature itself, my only concern is that this feels
> like
> >> a
> >>>>>> very advanced feature but it would be easy for new users to
> >>>> accidentally
> >>>>>> abuse it and get their application in trouble. Specifically I'm
> >> worried
> >>>>>> about how this could be harmful to applications for which some
> degree
> >>>> of
> >>>>>> synchronization is required, such as a join. Correct join semantics
> >>>> rely
> >>>>>> heavily on receiving records from both sides of the join and
> carefully
> >>>>>> selecting the next one to process based on timestamp. Imagine if a
> >>>>>> DeserializationException occurs upstream of a repartition feeding
> into
> >>>>> one
> >>>>>> side of a join (but not the other) and the user opts to PAUSE this
> >>>> task.
> >>>>> If
> >>>>>> the join continues  as usual it could lead to missed or incorrect
> >>>> results
> >>>>>> when processing is enforced with no records present on one side of
> the
> >>>>> join
> >>>>>> but usual traffic flowing through the other. Maybe we could somehow
> >>>>> signal
> >>>>>> to also PAUSE all downstream/dependent tasks? Should be able to add
> >>>> this
> >>>>>> information to the subscription metadata and send to all clients
> via a
> >>>>>> rebalance. There might be better options I'm not seeing. Or we could
> >>>> just
> >>>>>> decide to trust the users not to shoot themselves in the foot -- as
> >>>> long
> >>>>> as
> >>>>>> we write a clear warning in the javadocs this might be fine
> >>>>>>
> >>>>>> Thanks for all the great KIPs!
> >>>>>>
> >>>>>> On Thu, Oct 12, 2023 at 9:51 AM Nick Telford <
> nick.telf...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi everyone,
> >>>>>>>
> >>>>>>> This is a Streams KIP to add a new DeserializationHandlerResponse,
> >>>>>>> "SUSPEND", that suspends the failing Task but continues to process
> >>>>> other
> >>>>>>> Tasks normally.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException
> >>>>>>>
> >>>>>>> I'm not yet completely convinced that this is practical, as I
> suspect
> >>>>> it
> >>>>>>> might be abusing the SUSPENDED Task.State for something it was not
> >>>>>> designed
> >>>>>>> for. The intent is to pause an active Task *without* re-assigning
> it
> >>>> to
> >>>>>>> another instance, which causes cascading failures when the FAIL
> >>>>>>> DeserializationHandlerResponse is used.
> >>>>>>>
> >>>>>>> Let me know what you think!
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Nick
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to