1. Makes sense to me! Can you just update the name of the
DeserializationHandlerResponse enum from SUSPEND to PAUSE so
we're consistent with the wording?
The drawback here would be that custom stateful Processors
might also be impacted, but there'd be no way to know if they're safe to
not pause.
2. This is a really good point -- maybe this is just a case where we have
to trust
in the user not to accidentally screw themselves over. As long as we
provide
sufficient information for them to decide when it is/isn't safe to pause a
task,
I would be ok with just documenting the dangers of indiscriminate use of
this
feature, and hope that everyone reads the warning.
Given the above, I have one suggestion: what if we only add the PAUSE enum
in this KIP, and don't include an OOTB DeserializationExceptionHandler that
implements this? I see this as addressing two concerns:
2a. It would make it clear that this is an advanced feature and should be
given
careful consideration, rather than just plugging in a config value.
2b. It forces the user to implement the handler themselves, which gives
them
an opportunity to check on which task it is that's hitting the error and
then
make a conscious decision as to whether it is safe to pause or not. In the
end,
it's really impossible for us to know what is/is not safe to pause, so the
more
responsibility we can put on the user in this case, the better.
3. It sounds like the general recovery workflow would be to either resolve
the
issue somehow (presumably by fixing an issue in the deserializer?) and
restart the application -- in which case no further manual intervention is
required -- or else to determine the record is unprocessable and should be
skipped, in which case the user needs to somehow increment the offset
and then resume the task.
It's a bit awkward to ask people to use the command line tools to manually
wind the offset forward. More importantly, there are likely many operators
who
don't have the permissions necessary to use the command line tools for
this kind of thing, and they would be pretty much out of luck in that case.
On the flipside, it seems like if the user ever wants to resume the task
without restarting, they will need to skip over the bad record. I think we
can
make the feature considerably more ergonomic by modifying the behavior
of the #resume method so that it always skips over the bad record. This
will probably be the easiest to implement anyways, as it is effectively the
same as the CONTINUE option internally, but gives the user time to
decide if they really do want to CONTINUE or not
Not sure if we would want to rename the #resume method in that case to
make this more clear, or if javadocs would be sufficient...maybe
something like #skipRecordAndContinue?
On Tue, Oct 24, 2023 at 6:54 AM Nick Telford <nick.telf...@gmail.com>
wrote:
Hi Sophie,
Thanks for the review!
1-3.
I had a feeling this was the case. I'm thinking of adding a PAUSED state
with the following valid transitions:
- RUNNING -> PAUSED
- PAUSED -> RUNNING
- PAUSED -> SUSPENDED
The advantage of a dedicated State is it should make testing easier and
also reduce the potential for introducing bugs into the existing Task
states.
While I appreciate that the engine is being revised, I think I'll still
pursue this actively instead of waiting, as it addresses some problems my
team is having right now. If the KIP is accepted, then I suspect that
this
feature would still be desirable with the new streams engine, so any new
Task state would likely want to be mirrored in the new engine, and the
high
level design is unlikely to change.
4a.
This is an excellent point I hadn't considered. Correct me if I'm wrong,
but the only joins that this would impact are Stream-Stream and
Stream-Table joins? Table-Table joins should be safe, because the join is
commutative, so a delayed record on one side should just cause its output
record to be delayed, but not lost.
4b.
If we can enumerate only the node types that are impacted by this (i.e.
Stream-Stream and Stream-Table joins), then perhaps we could restrict it
such that it only pauses dependent Tasks if there's a Stream-Stream/Table
join involved? The drawback here would be that custom stateful Processors
might also be impacted, but there'd be no way to know if they're safe to
not pause.
4c.
Regardless, I like this idea, but I have very little knowledge about
making
changes to the rebalance/network protocol. It looks like this could be
added via StreamsPartitionAssignor#subscriptionUserData? I might need
some
help designing this aspect of this KIP.
Regards,
Nick
On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman <sop...@responsive.dev
wrote:
Hey Nick,
A few high-level thoughts:
1. We definitely don't want to piggyback on the SUSPENDED task state,
as
this is currently more like an intermediate state that a task passes
through as it's being closed/migrated elsewhere, it doesn't really mean
that a task is "suspended" and there's no logic to suspend processing
on
it. What you want is probably closer in spirit to the concept of a
paused
"named topology", where we basically freeze processing on a specific
task
(or set of tasks).
2. More importantly however, the SUSPENDED state was only ever needed
to
support efficient eager rebalancing, and we plan to remove the eager
rebalancing protocol from Streams entirely in the near future. And
unfortunately, the named topologies feature was never fully implemented
and
will probably be ripped out sometime soon as well.
3. In short, to go this route, you'd probably need to implement a
PAUSED
state from scratch. This wouldn't be impossible, but we are planning to
basically revamp the entire thread model and decouple the consumer
(potentially including the deserialization step) from the processing
threads. Much as I love the idea of this feature, it might not make a
lot
of sense to spend time implementing right now when much of that work
could
be scrapped in the mid-term future. We don't have a timeline for this,
however, so I don't think this should discourage you if the feature
seems
worth it, just want to give you a sense of the upcoming roadmap.
4. As for the feature itself, my only concern is that this feels like a
very advanced feature but it would be easy for new users to
accidentally
abuse it and get their application in trouble. Specifically I'm worried
about how this could be harmful to applications for which some degree
of
synchronization is required, such as a join. Correct join semantics
rely
heavily on receiving records from both sides of the join and carefully
selecting the next one to process based on timestamp. Imagine if a
DeserializationException occurs upstream of a repartition feeding into
one
side of a join (but not the other) and the user opts to PAUSE this
task.
If
the join continues as usual it could lead to missed or incorrect
results
when processing is enforced with no records present on one side of the
join
but usual traffic flowing through the other. Maybe we could somehow
signal
to also PAUSE all downstream/dependent tasks? Should be able to add
this
information to the subscription metadata and send to all clients via a
rebalance. There might be better options I'm not seeing. Or we could
just
decide to trust the users not to shoot themselves in the foot -- as
long
as
we write a clear warning in the javadocs this might be fine
Thanks for all the great KIPs!
On Thu, Oct 12, 2023 at 9:51 AM Nick Telford <nick.telf...@gmail.com>
wrote:
Hi everyone,
This is a Streams KIP to add a new DeserializationHandlerResponse,
"SUSPEND", that suspends the failing Task but continues to process
other
Tasks normally.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException
I'm not yet completely convinced that this is practical, as I suspect
it
might be abusing the SUSPENDED Task.State for something it was not
designed
for. The intent is to pause an active Task *without* re-assigning it
to
another instance, which causes cascading failures when the FAIL
DeserializationHandlerResponse is used.
Let me know what you think!
Regards,
Nick