1.
Woops! I've fixed that now. Thanks for catching that.

2.
I agree, I'll remove the LogAndPause handler so it's clear this is an
advanced feature. I'll also add some documentation to
DeserializationExceptionResponse#SUSPEND that explains the care users
should approach it with.

3a.
This is interesting. My main concern is that there may be situations where
skipping a single bad record is not the necessary solution, but the Task
should still be resumed without restarting the application. For example, if
there are several bad records in a row that should be skipped.

3b.
Additionally, a Task may have multiple input topics, so we'd need some way
to indicate which record to skip.

These can probably be resolved by something like skipAndContinue(TaskId
task, String topic, int recordsToSkip) or even skipAndContinue(TaskId task,
Map<String, Integer> recordsToSkipByTopic)?

4.
Related to 2: I was thinking that users implementing their own handler may
want to be able to determine which Processors (i.e. which Subtopology/task
group) are being affected, so they can programmatically make a decision on
whether it's safe to PAUSE. ProcessorContext, which is already a parameter
to DeserializationExceptionHandler provides the TaskId of the failed Task,
but doesn't provide metadata on the Processors that Task executes.

Since TaskIds are non-deterministic (they can change when you modify your
topology, with no influence over how they're assigned), a user cannot use
TaskId alone to determine which Processors would be affected.

What do you think would be the best way to provide this information to
exception handlers? I was originally thinking that users could instantiate
the handler themselves and provide a TopologyDescription (via
KafkaStreams#describe) in the constructor, but it looks like configs of
type Class cannot accept an already instantiated instance, and there's no
other way to inject information like that.

Perhaps we could add something to ProcessorContext that contains details on
the sub-topology being executed?

Regards,
Nick

On Thu, 26 Oct 2023 at 01:24, Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> 1. Makes sense to me! Can you just update the name of the
> DeserializationHandlerResponse enum from SUSPEND to PAUSE so
> we're consistent with the wording?
>
> The drawback here would be that custom stateful Processors
> > might also be impacted, but there'd be no way to know if they're safe to
> > not pause.
> >
> 2. This is a really good point -- maybe this is just a case where we have
> to trust
> in the user not to accidentally screw themselves over. As long as we
> provide
> sufficient information for them to decide when it is/isn't safe to pause a
> task,
> I would be ok with just documenting the dangers of indiscriminate use of
> this
> feature, and hope that everyone reads the warning.
>
> Given the above, I have one suggestion: what if we only add the PAUSE enum
> in this KIP, and don't include an OOTB DeserializationExceptionHandler that
> implements this? I see this as addressing two concerns:
> 2a. It would make it clear that this is an advanced feature and should be
> given
> careful consideration, rather than just plugging in a config value.
> 2b. It forces the user to implement the handler themselves, which gives
> them
> an opportunity to check on which task it is that's hitting the error and
> then
> make a conscious decision as to whether it is safe to pause or not. In the
> end,
> it's really impossible for us to know what is/is not safe to pause, so the
> more
> responsibility we can put on the user in this case, the better.
>
> 3. It sounds like the general recovery workflow would be to either resolve
> the
> issue somehow (presumably by fixing an issue in the deserializer?) and
> restart the application -- in which case no further manual intervention is
> required -- or else to determine the record is unprocessable and should be
> skipped, in which case the user needs to somehow increment the offset
> and then resume the task.
>
> It's a bit awkward to ask people to use the command line tools to manually
> wind the offset forward. More importantly, there are likely many operators
> who
> don't have the permissions necessary to use the command line tools for
> this kind of thing, and they would be pretty much out of luck in that case.
>
> On the flipside, it seems like if the user ever wants to resume the task
> without restarting, they will need to skip over the bad record. I think we
> can
> make the feature considerably more ergonomic by modifying the behavior
> of the #resume method so that it always skips over the bad record. This
> will probably be the easiest to implement anyways, as it is effectively the
> same as the CONTINUE option internally, but gives the user time to
> decide if they really do want to CONTINUE or not
>
> Not sure if we would want to rename the #resume method in that case to
> make this more clear, or if javadocs would be sufficient...maybe
> something like #skipRecordAndContinue?
>
> On Tue, Oct 24, 2023 at 6:54 AM Nick Telford <nick.telf...@gmail.com>
> wrote:
>
> > Hi Sophie,
> >
> > Thanks for the review!
> >
> > 1-3.
> > I had a feeling this was the case. I'm thinking of adding a PAUSED state
> > with the following valid transitions:
> >
> >    - RUNNING -> PAUSED
> >    - PAUSED -> RUNNING
> >    - PAUSED -> SUSPENDED
> >
> > The advantage of a dedicated State is it should make testing easier and
> > also reduce the potential for introducing bugs into the existing Task
> > states.
> >
> > While I appreciate that the engine is being revised, I think I'll still
> > pursue this actively instead of waiting, as it addresses some problems my
> > team is having right now. If the KIP is accepted, then I suspect that
> this
> > feature would still be desirable with the new streams engine, so any new
> > Task state would likely want to be mirrored in the new engine, and the
> high
> > level design is unlikely to change.
> >
> > 4a.
> > This is an excellent point I hadn't considered. Correct me if I'm wrong,
> > but the only joins that this would impact are Stream-Stream and
> > Stream-Table joins? Table-Table joins should be safe, because the join is
> > commutative, so a delayed record on one side should just cause its output
> > record to be delayed, but not lost.
> >
> > 4b.
> > If we can enumerate only the node types that are impacted by this (i.e.
> > Stream-Stream and Stream-Table joins), then perhaps we could restrict it
> > such that it only pauses dependent Tasks if there's a Stream-Stream/Table
> > join involved? The drawback here would be that custom stateful Processors
> > might also be impacted, but there'd be no way to know if they're safe to
> > not pause.
> >
> > 4c.
> > Regardless, I like this idea, but I have very little knowledge about
> making
> > changes to the rebalance/network protocol. It looks like this could be
> > added via StreamsPartitionAssignor#subscriptionUserData? I might need
> some
> > help designing this aspect of this KIP.
> >
> > Regards,
> > Nick
> >
> > On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman <sop...@responsive.dev
> >
> > wrote:
> >
> > > Hey Nick,
> > >
> > > A few high-level thoughts:
> > >
> > > 1. We definitely don't want to piggyback on the SUSPENDED task state,
> as
> > > this is currently more like an intermediate state that a task passes
> > > through as it's being closed/migrated elsewhere, it doesn't really mean
> > > that a task is "suspended" and there's no logic to suspend processing
> on
> > > it. What you want is probably closer in spirit to the concept of a
> paused
> > > "named topology", where we basically freeze processing on a specific
> task
> > > (or set of tasks).
> > > 2. More importantly however, the SUSPENDED state was only ever needed
> to
> > > support efficient eager rebalancing, and we plan to remove the eager
> > > rebalancing protocol from Streams entirely in the near future. And
> > > unfortunately, the named topologies feature was never fully implemented
> > and
> > > will probably be ripped out sometime soon as well.
> > > 3. In short, to go this route, you'd probably need to implement a
> PAUSED
> > > state from scratch. This wouldn't be impossible, but we are planning to
> > > basically revamp the entire thread model and decouple the consumer
> > > (potentially including the deserialization step) from the processing
> > > threads. Much as I love the idea of this feature, it might not make a
> lot
> > > of sense to spend time implementing right now when much of that work
> > could
> > > be scrapped in the mid-term future. We don't have a timeline for this,
> > > however, so I don't think this should discourage you if the feature
> seems
> > > worth it, just want to give you a sense of the upcoming roadmap.
> > > 4. As for the feature itself, my only concern is that this feels like a
> > > very advanced feature but it would be easy for new users to
> accidentally
> > > abuse it and get their application in trouble. Specifically I'm worried
> > > about how this could be harmful to applications for which some degree
> of
> > > synchronization is required, such as a join. Correct join semantics
> rely
> > > heavily on receiving records from both sides of the join and carefully
> > > selecting the next one to process based on timestamp. Imagine if a
> > > DeserializationException occurs upstream of a repartition feeding into
> > one
> > > side of a join (but not the other) and the user opts to PAUSE this
> task.
> > If
> > > the join continues  as usual it could lead to missed or incorrect
> results
> > > when processing is enforced with no records present on one side of the
> > join
> > > but usual traffic flowing through the other. Maybe we could somehow
> > signal
> > > to also PAUSE all downstream/dependent tasks? Should be able to add
> this
> > > information to the subscription metadata and send to all clients via a
> > > rebalance. There might be better options I'm not seeing. Or we could
> just
> > > decide to trust the users not to shoot themselves in the foot -- as
> long
> > as
> > > we write a clear warning in the javadocs this might be fine
> > >
> > > Thanks for all the great KIPs!
> > >
> > > On Thu, Oct 12, 2023 at 9:51 AM Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > This is a Streams KIP to add a new DeserializationHandlerResponse,
> > > > "SUSPEND", that suspends the failing Task but continues to process
> > other
> > > > Tasks normally.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException
> > > >
> > > > I'm not yet completely convinced that this is practical, as I suspect
> > it
> > > > might be abusing the SUSPENDED Task.State for something it was not
> > > designed
> > > > for. The intent is to pause an active Task *without* re-assigning it
> to
> > > > another instance, which causes cascading failures when the FAIL
> > > > DeserializationHandlerResponse is used.
> > > >
> > > > Let me know what you think!
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > >
> >
>

Reply via email to