Hi Sophie,

Thanks for the review!

1-3.
I had a feeling this was the case. I'm thinking of adding a PAUSED state
with the following valid transitions:

   - RUNNING -> PAUSED
   - PAUSED -> RUNNING
   - PAUSED -> SUSPENDED

The advantage of a dedicated State is it should make testing easier and
also reduce the potential for introducing bugs into the existing Task
states.

While I appreciate that the engine is being revised, I think I'll still
pursue this actively instead of waiting, as it addresses some problems my
team is having right now. If the KIP is accepted, then I suspect that this
feature would still be desirable with the new streams engine, so any new
Task state would likely want to be mirrored in the new engine, and the high
level design is unlikely to change.

4a.
This is an excellent point I hadn't considered. Correct me if I'm wrong,
but the only joins that this would impact are Stream-Stream and
Stream-Table joins? Table-Table joins should be safe, because the join is
commutative, so a delayed record on one side should just cause its output
record to be delayed, but not lost.

4b.
If we can enumerate only the node types that are impacted by this (i.e.
Stream-Stream and Stream-Table joins), then perhaps we could restrict it
such that it only pauses dependent Tasks if there's a Stream-Stream/Table
join involved? The drawback here would be that custom stateful Processors
might also be impacted, but there'd be no way to know if they're safe to
not pause.

4c.
Regardless, I like this idea, but I have very little knowledge about making
changes to the rebalance/network protocol. It looks like this could be
added via StreamsPartitionAssignor#subscriptionUserData? I might need some
help designing this aspect of this KIP.

Regards,
Nick

On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> Hey Nick,
>
> A few high-level thoughts:
>
> 1. We definitely don't want to piggyback on the SUSPENDED task state, as
> this is currently more like an intermediate state that a task passes
> through as it's being closed/migrated elsewhere, it doesn't really mean
> that a task is "suspended" and there's no logic to suspend processing on
> it. What you want is probably closer in spirit to the concept of a paused
> "named topology", where we basically freeze processing on a specific task
> (or set of tasks).
> 2. More importantly however, the SUSPENDED state was only ever needed to
> support efficient eager rebalancing, and we plan to remove the eager
> rebalancing protocol from Streams entirely in the near future. And
> unfortunately, the named topologies feature was never fully implemented and
> will probably be ripped out sometime soon as well.
> 3. In short, to go this route, you'd probably need to implement a PAUSED
> state from scratch. This wouldn't be impossible, but we are planning to
> basically revamp the entire thread model and decouple the consumer
> (potentially including the deserialization step) from the processing
> threads. Much as I love the idea of this feature, it might not make a lot
> of sense to spend time implementing right now when much of that work could
> be scrapped in the mid-term future. We don't have a timeline for this,
> however, so I don't think this should discourage you if the feature seems
> worth it, just want to give you a sense of the upcoming roadmap.
> 4. As for the feature itself, my only concern is that this feels like a
> very advanced feature but it would be easy for new users to accidentally
> abuse it and get their application in trouble. Specifically I'm worried
> about how this could be harmful to applications for which some degree of
> synchronization is required, such as a join. Correct join semantics rely
> heavily on receiving records from both sides of the join and carefully
> selecting the next one to process based on timestamp. Imagine if a
> DeserializationException occurs upstream of a repartition feeding into one
> side of a join (but not the other) and the user opts to PAUSE this task. If
> the join continues  as usual it could lead to missed or incorrect results
> when processing is enforced with no records present on one side of the join
> but usual traffic flowing through the other. Maybe we could somehow signal
> to also PAUSE all downstream/dependent tasks? Should be able to add this
> information to the subscription metadata and send to all clients via a
> rebalance. There might be better options I'm not seeing. Or we could just
> decide to trust the users not to shoot themselves in the foot -- as long as
> we write a clear warning in the javadocs this might be fine
>
> Thanks for all the great KIPs!
>
> On Thu, Oct 12, 2023 at 9:51 AM Nick Telford <nick.telf...@gmail.com>
> wrote:
>
> > Hi everyone,
> >
> > This is a Streams KIP to add a new DeserializationHandlerResponse,
> > "SUSPEND", that suspends the failing Task but continues to process other
> > Tasks normally.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException
> >
> > I'm not yet completely convinced that this is practical, as I suspect it
> > might be abusing the SUSPENDED Task.State for something it was not
> designed
> > for. The intent is to pause an active Task *without* re-assigning it to
> > another instance, which causes cascading failures when the FAIL
> > DeserializationHandlerResponse is used.
> >
> > Let me know what you think!
> >
> > Regards,
> > Nick
> >
>

Reply via email to