Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-04-05 Thread Sophie Blee-Goldman
Thanks for the followup Nick!

This is definitely one of those things that feels like it should be easy
and useful for everyone, but "the devil is in the details" as we always
say. It'll be good to have this thread and the discussion so far should
anyone want to attempt it in the future or need something similar

On Thu, Mar 28, 2024 at 6:03 AM Nick Telford  wrote:

> Hi folks,
>
> Sorry I haven't got back to you until now.
>
> It's become clear that I hadn't anticipated a significant number of
> technical challenges that this KIP presents. I think expecting users to
> understand the ramifications on aggregations, joins and windowing
> ultimately kills it: it only becomes a problem under *specific*
> combinations of operations, and those problems would manifest in ways that
> might be difficult for users to detect, let alone diagnose.
>
> I think it's best to abandon this KIP, at least for now. If anyone else
> sees a use for and path forwards for it, feel free to pick it up.
>
> Since I'm abandoning the KIP, I won't update the Motivation section. But I
> will provide a bit of background here on why I originally suggested it, in
> case you're interested:
>
> In my organisation, we don't have schemas for *any* of our data in Kafka.
> Consequently, one of the biggest causes of downtime in our applications are
> "bad" records being written by Producers. We integrate with a lot of
> third-party APIs, and have Producers that just push that data straight to
> Kafka with very little validation. I've lost count of the number of times
> my application has been crashed by a deserialization exception because we
> received a record that looks like '{"error": "Bad gatetway"}' or similar,
> instead of the actual payload we expect.
>
> The difficulty is we can't just use CONTINUE to discard these messages,
> because we also sometimes get deserialization exceptions caused by an
> upstream schema change that is incompatible with the expectations of our
> app. In these cases, we don't want to discard records (which are
> technically valid), but instead need to adjust our application to be
> compatible with the new schema, before processing them.
>
> Crucially, we use a monolithic app, with more than 45 sub-topologies, so
> crashing the entire app just because of one bad record causes downtime on
> potentially unrelated sub-topologies.
>
> This was the motivation for this KIP, which would have enabled users to
> make a decision on what to do about a bad message, *without taking down the
> entire application*.
>
> Obviously, the *correct* solution to this problem is to introduce schemas
> on our topics and have our Producers correctly validate records before
> writing them to the cluster. This is ultimately the solution I am going to
> pursue in lieu of this KIP.
>
> I still think this KIP could have been useful for dealing with an
> incompatible upstream schema change; by pausing only the sub-topologies
> that are affected by the schema change, while leaving others to continue to
> run while the user deploys a fix. However, in practice I think few users
> have monolithic apps like ours, and most instead de-couple unrelated topics
> via different apps, which reduces the impact of incompatible upstream
> schema changes.
>
> Thanks for your reviews and feedback, I've learned a lot, as always; this
> time, mostly about how, when authoring a KIP,  I should always ask myself:
> "yes, but what about timestamp ordering?" :-D
>
> Nick
>
> On Thu, 14 Mar 2024 at 03:27, Sophie Blee-Goldman 
> wrote:
>
> > >
> > > Well, the KIP mentions the ability to either re-try the record (eg,
> > > after applying some external fix that would allow Kafka Streams to now
> > > deserialize the record now) or to skip it by advancing the offset.
> >
> >
> > That's fair -- you're definitely right that what's described in the KIP
> > document
> > right now would not be practical. I just wanted to clarify that this
> > doesn't
> > mean the feature as a whole is impractical, but certainly we'd want to
> > update the proposal to remove the line about resetting offsets via
> external
> > tool and come up with a more concrete approach, and perhaps  describe
> > it in more detail.
> >
> > That's  probably not worth getting into until/unless we decide whether to
> > go forward with this feature in the first place. I'll let Nick reflect on
> > the
> > motivation and your other comments and then decide whether he still
> > wants to pursue it.
> >
> > To Nick: if you want to go through with this KIP and can expand on the
> > motivation so that we understand it better, I'd be happy to help work
> > out the details. For now I'll just wait for your decision
> >
> > On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax 
> wrote:
> >
> > > Yes, about the "drop records" case. It's a very common scenario to have
> > > a repartition step before a windowed aggregation or a join with
> > > grace-period.
> > >
> > >
> > > About "add feature vs guard users": it's always a tricky 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-28 Thread Nick Telford
Hi folks,

Sorry I haven't got back to you until now.

It's become clear that I hadn't anticipated a significant number of
technical challenges that this KIP presents. I think expecting users to
understand the ramifications on aggregations, joins and windowing
ultimately kills it: it only becomes a problem under *specific*
combinations of operations, and those problems would manifest in ways that
might be difficult for users to detect, let alone diagnose.

I think it's best to abandon this KIP, at least for now. If anyone else
sees a use for and path forwards for it, feel free to pick it up.

Since I'm abandoning the KIP, I won't update the Motivation section. But I
will provide a bit of background here on why I originally suggested it, in
case you're interested:

In my organisation, we don't have schemas for *any* of our data in Kafka.
Consequently, one of the biggest causes of downtime in our applications are
"bad" records being written by Producers. We integrate with a lot of
third-party APIs, and have Producers that just push that data straight to
Kafka with very little validation. I've lost count of the number of times
my application has been crashed by a deserialization exception because we
received a record that looks like '{"error": "Bad gatetway"}' or similar,
instead of the actual payload we expect.

The difficulty is we can't just use CONTINUE to discard these messages,
because we also sometimes get deserialization exceptions caused by an
upstream schema change that is incompatible with the expectations of our
app. In these cases, we don't want to discard records (which are
technically valid), but instead need to adjust our application to be
compatible with the new schema, before processing them.

Crucially, we use a monolithic app, with more than 45 sub-topologies, so
crashing the entire app just because of one bad record causes downtime on
potentially unrelated sub-topologies.

This was the motivation for this KIP, which would have enabled users to
make a decision on what to do about a bad message, *without taking down the
entire application*.

Obviously, the *correct* solution to this problem is to introduce schemas
on our topics and have our Producers correctly validate records before
writing them to the cluster. This is ultimately the solution I am going to
pursue in lieu of this KIP.

I still think this KIP could have been useful for dealing with an
incompatible upstream schema change; by pausing only the sub-topologies
that are affected by the schema change, while leaving others to continue to
run while the user deploys a fix. However, in practice I think few users
have monolithic apps like ours, and most instead de-couple unrelated topics
via different apps, which reduces the impact of incompatible upstream
schema changes.

Thanks for your reviews and feedback, I've learned a lot, as always; this
time, mostly about how, when authoring a KIP,  I should always ask myself:
"yes, but what about timestamp ordering?" :-D

Nick

On Thu, 14 Mar 2024 at 03:27, Sophie Blee-Goldman 
wrote:

> >
> > Well, the KIP mentions the ability to either re-try the record (eg,
> > after applying some external fix that would allow Kafka Streams to now
> > deserialize the record now) or to skip it by advancing the offset.
>
>
> That's fair -- you're definitely right that what's described in the KIP
> document
> right now would not be practical. I just wanted to clarify that this
> doesn't
> mean the feature as a whole is impractical, but certainly we'd want to
> update the proposal to remove the line about resetting offsets via external
> tool and come up with a more concrete approach, and perhaps  describe
> it in more detail.
>
> That's  probably not worth getting into until/unless we decide whether to
> go forward with this feature in the first place. I'll let Nick reflect on
> the
> motivation and your other comments and then decide whether he still
> wants to pursue it.
>
> To Nick: if you want to go through with this KIP and can expand on the
> motivation so that we understand it better, I'd be happy to help work
> out the details. For now I'll just wait for your decision
>
> On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax  wrote:
>
> > Yes, about the "drop records" case. It's a very common scenario to have
> > a repartition step before a windowed aggregation or a join with
> > grace-period.
> >
> >
> > About "add feature vs guard users": it's always a tricky question and
> > tradeoff. For this particular KIP, I personally think we should opt to
> > not add the feature but guard the users, as I don't see too much value
> > compared to the complexity and "traps" it adds. -- It's of course just
> > my personal opinion, and if there is a asked from many users to add this
> > feature, I would not push back further. As mentioned in my previous
> > reply, I don't fully understand the motivation yet; maybe Nick can
> > provide more context on it.
> >
> >
> > > In other words, opting for the PAUSE option would 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-13 Thread Sophie Blee-Goldman
>
> Well, the KIP mentions the ability to either re-try the record (eg,
> after applying some external fix that would allow Kafka Streams to now
> deserialize the record now) or to skip it by advancing the offset.


That's fair -- you're definitely right that what's described in the KIP
document
right now would not be practical. I just wanted to clarify that this
doesn't
mean the feature as a whole is impractical, but certainly we'd want to
update the proposal to remove the line about resetting offsets via external
tool and come up with a more concrete approach, and perhaps  describe
it in more detail.

That's  probably not worth getting into until/unless we decide whether to
go forward with this feature in the first place. I'll let Nick reflect on
the
motivation and your other comments and then decide whether he still
wants to pursue it.

To Nick: if you want to go through with this KIP and can expand on the
motivation so that we understand it better, I'd be happy to help work
out the details. For now I'll just wait for your decision

On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax  wrote:

> Yes, about the "drop records" case. It's a very common scenario to have
> a repartition step before a windowed aggregation or a join with
> grace-period.
>
>
> About "add feature vs guard users": it's always a tricky question and
> tradeoff. For this particular KIP, I personally think we should opt to
> not add the feature but guard the users, as I don't see too much value
> compared to the complexity and "traps" it adds. -- It's of course just
> my personal opinion, and if there is a asked from many users to add this
> feature, I would not push back further. As mentioned in my previous
> reply, I don't fully understand the motivation yet; maybe Nick can
> provide more context on it.
>
>
> > In other words, opting for the PAUSE option would simply stall the
> > task, and upon #resume it would just be discarding that record and then
> > continuing on with processing
>
> Well, the KIP mentions the ability to either re-try the record (eg,
> after applying some external fix that would allow Kafka Streams to now
> deserialize the record now) or to skip it by advancing the offset. But
> to do this, we need to extend the `resume()` callback to pass in this
> information, making the whole setup and usage of this feature more
> complex, as one needs to so more upfront instrumentation of their custom
> code. -- It's just a technical thing we need to consider if we want to
> move forward, and the KIP should not say "advancing the consumer
> offsets, either via an external tool" because this cannot work. Just
> pointing out incorrect technical assumption, not disregarding that it
> can be done.
>
>
> About committing: yes, I agree to all what you say, and again it was not
> meant as concern, but just as honest questions about some technical
> details. I think it would be good to consider there trade-offs and
> explain in the KIP why we want to do what. That's all.
>
>
>
> -Matthias
>
> On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote:
> >>
> >>   I see way too many food-guns and complications that can be introduced.
> >
> >
> > What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I
> > don't think that's what you meant hahaha
> >
> > I don't feel super strongly one way or another, but I have a few
> questions
> > & corrections about some of these complaints/concerns:
> >
> > If one task
> >> pauses but other keep running, we keep advancing stream-time downstream,
> >> and thus when the task would resume later, there is a very high
> >> probability that records are dropped as window got already closed.
> >
> > Just to make sure I/everyone understand what you're getting at here, you
> > would be
> > referring to the case of a stateful operation downstream of a
> key-changing
> > operation
> > which is in turn downstream of the  "paused" task -- ie with a
> repartition
> > separating
> > the paused task and the task with a windowed aggregation? Each task has
> its
> > own
> > view of stream-time (technically each processor within a task) so the
> only
> > way that
> > delaying one task and not another would affect which records get dropped
> is
> > if those
> > two tasks are rekeyed and the repartitioning results in their outputs
> being
> > mixed -- yes?
> >
> > Anyways I think you make a good case for why pausing a single task -- or
> > even an entire
> > instance if others are allowed to continue running -- might make it too
> > easy for users to
> > shoot themselves in the foot without understanding the full
> ramifications.
> > Of course, there
> > are already a million ways for users to screw up their app if configured
> or
> > operated incorrectly,
> > and we shouldn't necessarily kill a feature just because some people
> might
> > use it when they
> > shouldn't. Why can't we just document that this feature should not be
> used
> > with applications
> > that include time-sensitive operators?
> >
> > I also 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-13 Thread Matthias J. Sax
Yes, about the "drop records" case. It's a very common scenario to have 
a repartition step before a windowed aggregation or a join with 
grace-period.



About "add feature vs guard users": it's always a tricky question and 
tradeoff. For this particular KIP, I personally think we should opt to 
not add the feature but guard the users, as I don't see too much value 
compared to the complexity and "traps" it adds. -- It's of course just 
my personal opinion, and if there is a asked from many users to add this 
feature, I would not push back further. As mentioned in my previous 
reply, I don't fully understand the motivation yet; maybe Nick can 
provide more context on it.




In other words, opting for the PAUSE option would simply stall the
task, and upon #resume it would just be discarding that record and then
continuing on with processing


Well, the KIP mentions the ability to either re-try the record (eg, 
after applying some external fix that would allow Kafka Streams to now 
deserialize the record now) or to skip it by advancing the offset. But 
to do this, we need to extend the `resume()` callback to pass in this 
information, making the whole setup and usage of this feature more 
complex, as one needs to so more upfront instrumentation of their custom 
code. -- It's just a technical thing we need to consider if we want to 
move forward, and the KIP should not say "advancing the consumer 
offsets, either via an external tool" because this cannot work. Just 
pointing out incorrect technical assumption, not disregarding that it 
can be done.



About committing: yes, I agree to all what you say, and again it was not 
meant as concern, but just as honest questions about some technical 
details. I think it would be good to consider there trade-offs and 
explain in the KIP why we want to do what. That's all.




-Matthias

On 3/12/24 11:24 PM, Sophie Blee-Goldman wrote:


  I see way too many food-guns and complications that can be introduced.



What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I
don't think that's what you meant hahaha

I don't feel super strongly one way or another, but I have a few questions
& corrections about some of these complaints/concerns:

If one task

pauses but other keep running, we keep advancing stream-time downstream,
and thus when the task would resume later, there is a very high
probability that records are dropped as window got already closed.


Just to make sure I/everyone understand what you're getting at here, you
would be
referring to the case of a stateful operation downstream of a key-changing
operation
which is in turn downstream of the  "paused" task -- ie with a repartition
separating
the paused task and the task with a windowed aggregation? Each task has its
own
view of stream-time (technically each processor within a task) so the only
way that
delaying one task and not another would affect which records get dropped is
if those
two tasks are rekeyed and the repartitioning results in their outputs being
mixed -- yes?

Anyways I think you make a good case for why pausing a single task -- or
even an entire
instance if others are allowed to continue running -- might make it too
easy for users to
shoot themselves in the foot without understanding the full ramifications.
Of course, there
are already a million ways for users to screw up their app if configured or
operated incorrectly,
and we shouldn't necessarily kill a feature just because some people might
use it when they
shouldn't. Why can't we just document that this feature should not be used
with applications
that include time-sensitive operators?

I also feel like you dismissed the "skip record case" somewhat too easily:


For the "skip record case", it's also not possible to skip over an
offset from outside while the application is running



True, you can't advance the offset from outside the app, but I don't see why
you would want to, much less why you should need to for this to work.
Surely the best way to implement this case would just be for the #resume
API to behave, and work, exactly the same as the handler's CONTINUE
option? In other words, opting for the PAUSE option would simply stall the
task, and upon #resume it would just be discarding that record and then
continuing on with processing (or even committing the offset immediately
after
it, perhaps even asynchronously since it presumably doesn't matter if it
doesn't succeed and the record is picked up again by accident -- as long as
  that doesn't happen repeatedly in an infinite loop, which I don't see why
it would.)

On the subject of committing...

Other questions: if a task would be paused, would we commit the current

offset? What happens if we re-balance? Would we just lose the "pause"
state, and hit the same error again and just pause again?



I was imagining that we would either just wait without committing, or
perhaps
even commit everything up to -- but not including -- the "bad" record when
PAUSE is triggered. 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-13 Thread Sophie Blee-Goldman
>
>  I see way too many food-guns and complications that can be introduced.


What is a "food-gun"?? I'm picturing like a spud rifle/potato gun but I
don't think that's what you meant hahaha

I don't feel super strongly one way or another, but I have a few questions
& corrections about some of these complaints/concerns:

If one task
> pauses but other keep running, we keep advancing stream-time downstream,
> and thus when the task would resume later, there is a very high
> probability that records are dropped as window got already closed.

Just to make sure I/everyone understand what you're getting at here, you
would be
referring to the case of a stateful operation downstream of a key-changing
operation
which is in turn downstream of the  "paused" task -- ie with a repartition
separating
the paused task and the task with a windowed aggregation? Each task has its
own
view of stream-time (technically each processor within a task) so the only
way that
delaying one task and not another would affect which records get dropped is
if those
two tasks are rekeyed and the repartitioning results in their outputs being
mixed -- yes?

Anyways I think you make a good case for why pausing a single task -- or
even an entire
instance if others are allowed to continue running -- might make it too
easy for users to
shoot themselves in the foot without understanding the full ramifications.
Of course, there
are already a million ways for users to screw up their app if configured or
operated incorrectly,
and we shouldn't necessarily kill a feature just because some people might
use it when they
shouldn't. Why can't we just document that this feature should not be used
with applications
that include time-sensitive operators?

I also feel like you dismissed the "skip record case" somewhat too easily:

> For the "skip record case", it's also not possible to skip over an
> offset from outside while the application is running


True, you can't advance the offset from outside the app, but I don't see why
you would want to, much less why you should need to for this to work.
Surely the best way to implement this case would just be for the #resume
API to behave, and work, exactly the same as the handler's CONTINUE
option? In other words, opting for the PAUSE option would simply stall the
task, and upon #resume it would just be discarding that record and then
continuing on with processing (or even committing the offset immediately
after
it, perhaps even asynchronously since it presumably doesn't matter if it
doesn't succeed and the record is picked up again by accident -- as long as
 that doesn't happen repeatedly in an infinite loop, which I don't see why
it would.)

On the subject of committing...

Other questions: if a task would be paused, would we commit the current
> offset? What happens if we re-balance? Would we just lose the "pause"
> state, and hit the same error again and just pause again?


I was imagining that we would either just wait without committing, or
perhaps
even commit everything up to -- but not including -- the "bad" record when
PAUSE is triggered. Again, if we rebalance and "lose the pause" then
we'll just attempt to process it again, fail, and end up back in PAUSE. This
is no different than how successful processing works, no? Who cares if a
rebalance happens to strike and causes it to be PAUSED again?

All in all, I feel like these concerns are all essentially "true", but to
me they
just seem like implementation or design decisions and none of them strike
them as posing an unsolvable problem for this feature. But maybe I'm
just lacking in imagination...

Thoughts?


On Fri, Mar 8, 2024 at 5:30 PM Matthias J. Sax  wrote:

> Hey Nick,
>
> I am sorry that I have to say that I am not a fan of this KIP. I see way
> too many food-guns and complications that can be introduced.
>
> I am also not sure if I understand the motivation. You say, CONTINUE and
> FAIL is not good enough, but don't describe in detail why? If we
> understand the actual problem better, it might also get clear how
> task-pausing would help to address the problem.
>
>
> The main problem I see, as already mentioned by Sophie, it's about time
> synchronization. However, its not limited to joins, but affect all
> time-based operations, ie, also all windowed aggregations. If one task
> pauses but other keep running, we keep advancing stream-time downstream,
> and thus when the task would resume later, there is a very high
> probability that records are dropped as window got already closed.
>
> For the runtime itself, we also cannot really do a cascading downstream
> pause, because the runtime does not know anything about the semantics of
> operators. We don't know if we execute a DSL operator or a PAPI
> operator. (We could maybe track all downsteam tasks independent of
> semantics, but in the end it might just imply we could also just pause
> all task...)
>
> For the "skip record case", it's also not possible to skip over an
> offset from outside 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-08 Thread Matthias J. Sax

Hey Nick,

I am sorry that I have to say that I am not a fan of this KIP. I see way 
too many food-guns and complications that can be introduced.


I am also not sure if I understand the motivation. You say, CONTINUE and 
FAIL is not good enough, but don't describe in detail why? If we 
understand the actual problem better, it might also get clear how 
task-pausing would help to address the problem.



The main problem I see, as already mentioned by Sophie, it's about time 
synchronization. However, its not limited to joins, but affect all 
time-based operations, ie, also all windowed aggregations. If one task 
pauses but other keep running, we keep advancing stream-time downstream, 
and thus when the task would resume later, there is a very high 
probability that records are dropped as window got already closed.


For the runtime itself, we also cannot really do a cascading downstream 
pause, because the runtime does not know anything about the semantics of 
operators. We don't know if we execute a DSL operator or a PAPI 
operator. (We could maybe track all downsteam tasks independent of 
semantics, but in the end it might just imply we could also just pause 
all task...)


For the "skip record case", it's also not possible to skip over an 
offset from outside while the application is running. The offset in 
question is cached inside the consumer and the consumer would not go 
back to Kafka to re-read the offset (only when a partitions is 
re-assigned to a new consumer, the consumer would fetch the offset once 
to init itself). -- But even if the consumer would go back to read the 
offset, as long as the partition is assigned to a member of the group, 
it's not even possible to commit a new offset using some external tool. 
Only member of the group are allowed to commit offset, and all tools 
that allow to manipulate offsets require that the corresponding 
application is stopped, and that the consumer group is empty (and the 
tool will join the consumer group as only member and commit offsets).


Of course, we could pause all tasks, but that's kind similar to shut 
down? I agree though, that `FAIL` is rather harsh, and it could be a 
good thing to introduce a graceful `SHUTDOWN` option (similar to what we 
have via the uncaught exception handler)?


If we pause all tasks we would of course need to do this not just for a 
single instance, but for all... We do already have 
`KafkaStreams#pause()` but it does not include a application wide pause, 
but only an instance pause -- the assumption of this feature was, that 
an external pause signal would be send to all instances at the same 
time. Building it into KS was not done as potentially to complicated...


Other questions: if a task would be paused, would we commit the current 
offset? What happens if we re-balance? Would we just lose the "pause" 
state, and hit the same error again and just pause again?



Right now, I would rather propose to discard this KIP (or change the 
scope drastically to add a "global pause" and/or "global shutdown" 
option). Of course, if you can provide convincing answers, I am happy to 
move forward with per-task pausing. But my gut feeling is, that even if 
we would find technically sound solutions, it would be way too 
complicated to use (and maybe also to implement inside KS) for too 
little benefits.




-Matthias



On 10/26/23 5:57 AM, Nick Telford wrote:

1.
Woops! I've fixed that now. Thanks for catching that.

2.
I agree, I'll remove the LogAndPause handler so it's clear this is an
advanced feature. I'll also add some documentation to
DeserializationExceptionResponse#SUSPEND that explains the care users
should approach it with.

3a.
This is interesting. My main concern is that there may be situations where
skipping a single bad record is not the necessary solution, but the Task
should still be resumed without restarting the application. For example, if
there are several bad records in a row that should be skipped.

3b.
Additionally, a Task may have multiple input topics, so we'd need some way
to indicate which record to skip.

These can probably be resolved by something like skipAndContinue(TaskId
task, String topic, int recordsToSkip) or even skipAndContinue(TaskId task,
Map recordsToSkipByTopic)?

4.
Related to 2: I was thinking that users implementing their own handler may
want to be able to determine which Processors (i.e. which Subtopology/task
group) are being affected, so they can programmatically make a decision on
whether it's safe to PAUSE. ProcessorContext, which is already a parameter
to DeserializationExceptionHandler provides the TaskId of the failed Task,
but doesn't provide metadata on the Processors that Task executes.

Since TaskIds are non-deterministic (they can change when you modify your
topology, with no influence over how they're assigned), a user cannot use
TaskId alone to determine which Processors would be affected.

What do you think would be the best way to provide this information to

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-26 Thread Nick Telford
1.
Woops! I've fixed that now. Thanks for catching that.

2.
I agree, I'll remove the LogAndPause handler so it's clear this is an
advanced feature. I'll also add some documentation to
DeserializationExceptionResponse#SUSPEND that explains the care users
should approach it with.

3a.
This is interesting. My main concern is that there may be situations where
skipping a single bad record is not the necessary solution, but the Task
should still be resumed without restarting the application. For example, if
there are several bad records in a row that should be skipped.

3b.
Additionally, a Task may have multiple input topics, so we'd need some way
to indicate which record to skip.

These can probably be resolved by something like skipAndContinue(TaskId
task, String topic, int recordsToSkip) or even skipAndContinue(TaskId task,
Map recordsToSkipByTopic)?

4.
Related to 2: I was thinking that users implementing their own handler may
want to be able to determine which Processors (i.e. which Subtopology/task
group) are being affected, so they can programmatically make a decision on
whether it's safe to PAUSE. ProcessorContext, which is already a parameter
to DeserializationExceptionHandler provides the TaskId of the failed Task,
but doesn't provide metadata on the Processors that Task executes.

Since TaskIds are non-deterministic (they can change when you modify your
topology, with no influence over how they're assigned), a user cannot use
TaskId alone to determine which Processors would be affected.

What do you think would be the best way to provide this information to
exception handlers? I was originally thinking that users could instantiate
the handler themselves and provide a TopologyDescription (via
KafkaStreams#describe) in the constructor, but it looks like configs of
type Class cannot accept an already instantiated instance, and there's no
other way to inject information like that.

Perhaps we could add something to ProcessorContext that contains details on
the sub-topology being executed?

Regards,
Nick

On Thu, 26 Oct 2023 at 01:24, Sophie Blee-Goldman 
wrote:

> 1. Makes sense to me! Can you just update the name of the
> DeserializationHandlerResponse enum from SUSPEND to PAUSE so
> we're consistent with the wording?
>
> The drawback here would be that custom stateful Processors
> > might also be impacted, but there'd be no way to know if they're safe to
> > not pause.
> >
> 2. This is a really good point -- maybe this is just a case where we have
> to trust
> in the user not to accidentally screw themselves over. As long as we
> provide
> sufficient information for them to decide when it is/isn't safe to pause a
> task,
> I would be ok with just documenting the dangers of indiscriminate use of
> this
> feature, and hope that everyone reads the warning.
>
> Given the above, I have one suggestion: what if we only add the PAUSE enum
> in this KIP, and don't include an OOTB DeserializationExceptionHandler that
> implements this? I see this as addressing two concerns:
> 2a. It would make it clear that this is an advanced feature and should be
> given
> careful consideration, rather than just plugging in a config value.
> 2b. It forces the user to implement the handler themselves, which gives
> them
> an opportunity to check on which task it is that's hitting the error and
> then
> make a conscious decision as to whether it is safe to pause or not. In the
> end,
> it's really impossible for us to know what is/is not safe to pause, so the
> more
> responsibility we can put on the user in this case, the better.
>
> 3. It sounds like the general recovery workflow would be to either resolve
> the
> issue somehow (presumably by fixing an issue in the deserializer?) and
> restart the application -- in which case no further manual intervention is
> required -- or else to determine the record is unprocessable and should be
> skipped, in which case the user needs to somehow increment the offset
> and then resume the task.
>
> It's a bit awkward to ask people to use the command line tools to manually
> wind the offset forward. More importantly, there are likely many operators
> who
> don't have the permissions necessary to use the command line tools for
> this kind of thing, and they would be pretty much out of luck in that case.
>
> On the flipside, it seems like if the user ever wants to resume the task
> without restarting, they will need to skip over the bad record. I think we
> can
> make the feature considerably more ergonomic by modifying the behavior
> of the #resume method so that it always skips over the bad record. This
> will probably be the easiest to implement anyways, as it is effectively the
> same as the CONTINUE option internally, but gives the user time to
> decide if they really do want to CONTINUE or not
>
> Not sure if we would want to rename the #resume method in that case to
> make this more clear, or if javadocs would be sufficient...maybe
> something like #skipRecordAndContinue?
>
> On 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-25 Thread Sophie Blee-Goldman
1. Makes sense to me! Can you just update the name of the
DeserializationHandlerResponse enum from SUSPEND to PAUSE so
we're consistent with the wording?

The drawback here would be that custom stateful Processors
> might also be impacted, but there'd be no way to know if they're safe to
> not pause.
>
2. This is a really good point -- maybe this is just a case where we have
to trust
in the user not to accidentally screw themselves over. As long as we provide
sufficient information for them to decide when it is/isn't safe to pause a
task,
I would be ok with just documenting the dangers of indiscriminate use of
this
feature, and hope that everyone reads the warning.

Given the above, I have one suggestion: what if we only add the PAUSE enum
in this KIP, and don't include an OOTB DeserializationExceptionHandler that
implements this? I see this as addressing two concerns:
2a. It would make it clear that this is an advanced feature and should be
given
careful consideration, rather than just plugging in a config value.
2b. It forces the user to implement the handler themselves, which gives them
an opportunity to check on which task it is that's hitting the error and
then
make a conscious decision as to whether it is safe to pause or not. In the
end,
it's really impossible for us to know what is/is not safe to pause, so the
more
responsibility we can put on the user in this case, the better.

3. It sounds like the general recovery workflow would be to either resolve
the
issue somehow (presumably by fixing an issue in the deserializer?) and
restart the application -- in which case no further manual intervention is
required -- or else to determine the record is unprocessable and should be
skipped, in which case the user needs to somehow increment the offset
and then resume the task.

It's a bit awkward to ask people to use the command line tools to manually
wind the offset forward. More importantly, there are likely many operators
who
don't have the permissions necessary to use the command line tools for
this kind of thing, and they would be pretty much out of luck in that case.

On the flipside, it seems like if the user ever wants to resume the task
without restarting, they will need to skip over the bad record. I think we
can
make the feature considerably more ergonomic by modifying the behavior
of the #resume method so that it always skips over the bad record. This
will probably be the easiest to implement anyways, as it is effectively the
same as the CONTINUE option internally, but gives the user time to
decide if they really do want to CONTINUE or not

Not sure if we would want to rename the #resume method in that case to
make this more clear, or if javadocs would be sufficient...maybe
something like #skipRecordAndContinue?

On Tue, Oct 24, 2023 at 6:54 AM Nick Telford  wrote:

> Hi Sophie,
>
> Thanks for the review!
>
> 1-3.
> I had a feeling this was the case. I'm thinking of adding a PAUSED state
> with the following valid transitions:
>
>- RUNNING -> PAUSED
>- PAUSED -> RUNNING
>- PAUSED -> SUSPENDED
>
> The advantage of a dedicated State is it should make testing easier and
> also reduce the potential for introducing bugs into the existing Task
> states.
>
> While I appreciate that the engine is being revised, I think I'll still
> pursue this actively instead of waiting, as it addresses some problems my
> team is having right now. If the KIP is accepted, then I suspect that this
> feature would still be desirable with the new streams engine, so any new
> Task state would likely want to be mirrored in the new engine, and the high
> level design is unlikely to change.
>
> 4a.
> This is an excellent point I hadn't considered. Correct me if I'm wrong,
> but the only joins that this would impact are Stream-Stream and
> Stream-Table joins? Table-Table joins should be safe, because the join is
> commutative, so a delayed record on one side should just cause its output
> record to be delayed, but not lost.
>
> 4b.
> If we can enumerate only the node types that are impacted by this (i.e.
> Stream-Stream and Stream-Table joins), then perhaps we could restrict it
> such that it only pauses dependent Tasks if there's a Stream-Stream/Table
> join involved? The drawback here would be that custom stateful Processors
> might also be impacted, but there'd be no way to know if they're safe to
> not pause.
>
> 4c.
> Regardless, I like this idea, but I have very little knowledge about making
> changes to the rebalance/network protocol. It looks like this could be
> added via StreamsPartitionAssignor#subscriptionUserData? I might need some
> help designing this aspect of this KIP.
>
> Regards,
> Nick
>
> On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman 
> wrote:
>
> > Hey Nick,
> >
> > A few high-level thoughts:
> >
> > 1. We definitely don't want to piggyback on the SUSPENDED task state, as
> > this is currently more like an intermediate state that a task passes
> > through as it's being closed/migrated 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-24 Thread Nick Telford
Hi Sophie,

Thanks for the review!

1-3.
I had a feeling this was the case. I'm thinking of adding a PAUSED state
with the following valid transitions:

   - RUNNING -> PAUSED
   - PAUSED -> RUNNING
   - PAUSED -> SUSPENDED

The advantage of a dedicated State is it should make testing easier and
also reduce the potential for introducing bugs into the existing Task
states.

While I appreciate that the engine is being revised, I think I'll still
pursue this actively instead of waiting, as it addresses some problems my
team is having right now. If the KIP is accepted, then I suspect that this
feature would still be desirable with the new streams engine, so any new
Task state would likely want to be mirrored in the new engine, and the high
level design is unlikely to change.

4a.
This is an excellent point I hadn't considered. Correct me if I'm wrong,
but the only joins that this would impact are Stream-Stream and
Stream-Table joins? Table-Table joins should be safe, because the join is
commutative, so a delayed record on one side should just cause its output
record to be delayed, but not lost.

4b.
If we can enumerate only the node types that are impacted by this (i.e.
Stream-Stream and Stream-Table joins), then perhaps we could restrict it
such that it only pauses dependent Tasks if there's a Stream-Stream/Table
join involved? The drawback here would be that custom stateful Processors
might also be impacted, but there'd be no way to know if they're safe to
not pause.

4c.
Regardless, I like this idea, but I have very little knowledge about making
changes to the rebalance/network protocol. It looks like this could be
added via StreamsPartitionAssignor#subscriptionUserData? I might need some
help designing this aspect of this KIP.

Regards,
Nick

On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman 
wrote:

> Hey Nick,
>
> A few high-level thoughts:
>
> 1. We definitely don't want to piggyback on the SUSPENDED task state, as
> this is currently more like an intermediate state that a task passes
> through as it's being closed/migrated elsewhere, it doesn't really mean
> that a task is "suspended" and there's no logic to suspend processing on
> it. What you want is probably closer in spirit to the concept of a paused
> "named topology", where we basically freeze processing on a specific task
> (or set of tasks).
> 2. More importantly however, the SUSPENDED state was only ever needed to
> support efficient eager rebalancing, and we plan to remove the eager
> rebalancing protocol from Streams entirely in the near future. And
> unfortunately, the named topologies feature was never fully implemented and
> will probably be ripped out sometime soon as well.
> 3. In short, to go this route, you'd probably need to implement a PAUSED
> state from scratch. This wouldn't be impossible, but we are planning to
> basically revamp the entire thread model and decouple the consumer
> (potentially including the deserialization step) from the processing
> threads. Much as I love the idea of this feature, it might not make a lot
> of sense to spend time implementing right now when much of that work could
> be scrapped in the mid-term future. We don't have a timeline for this,
> however, so I don't think this should discourage you if the feature seems
> worth it, just want to give you a sense of the upcoming roadmap.
> 4. As for the feature itself, my only concern is that this feels like a
> very advanced feature but it would be easy for new users to accidentally
> abuse it and get their application in trouble. Specifically I'm worried
> about how this could be harmful to applications for which some degree of
> synchronization is required, such as a join. Correct join semantics rely
> heavily on receiving records from both sides of the join and carefully
> selecting the next one to process based on timestamp. Imagine if a
> DeserializationException occurs upstream of a repartition feeding into one
> side of a join (but not the other) and the user opts to PAUSE this task. If
> the join continues  as usual it could lead to missed or incorrect results
> when processing is enforced with no records present on one side of the join
> but usual traffic flowing through the other. Maybe we could somehow signal
> to also PAUSE all downstream/dependent tasks? Should be able to add this
> information to the subscription metadata and send to all clients via a
> rebalance. There might be better options I'm not seeing. Or we could just
> decide to trust the users not to shoot themselves in the foot -- as long as
> we write a clear warning in the javadocs this might be fine
>
> Thanks for all the great KIPs!
>
> On Thu, Oct 12, 2023 at 9:51 AM Nick Telford 
> wrote:
>
> > Hi everyone,
> >
> > This is a Streams KIP to add a new DeserializationHandlerResponse,
> > "SUSPEND", that suspends the failing Task but continues to process other
> > Tasks normally.
> >
> >
> >
> 

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-24 Thread Sophie Blee-Goldman
Hey Nick,

A few high-level thoughts:

1. We definitely don't want to piggyback on the SUSPENDED task state, as
this is currently more like an intermediate state that a task passes
through as it's being closed/migrated elsewhere, it doesn't really mean
that a task is "suspended" and there's no logic to suspend processing on
it. What you want is probably closer in spirit to the concept of a paused
"named topology", where we basically freeze processing on a specific task
(or set of tasks).
2. More importantly however, the SUSPENDED state was only ever needed to
support efficient eager rebalancing, and we plan to remove the eager
rebalancing protocol from Streams entirely in the near future. And
unfortunately, the named topologies feature was never fully implemented and
will probably be ripped out sometime soon as well.
3. In short, to go this route, you'd probably need to implement a PAUSED
state from scratch. This wouldn't be impossible, but we are planning to
basically revamp the entire thread model and decouple the consumer
(potentially including the deserialization step) from the processing
threads. Much as I love the idea of this feature, it might not make a lot
of sense to spend time implementing right now when much of that work could
be scrapped in the mid-term future. We don't have a timeline for this,
however, so I don't think this should discourage you if the feature seems
worth it, just want to give you a sense of the upcoming roadmap.
4. As for the feature itself, my only concern is that this feels like a
very advanced feature but it would be easy for new users to accidentally
abuse it and get their application in trouble. Specifically I'm worried
about how this could be harmful to applications for which some degree of
synchronization is required, such as a join. Correct join semantics rely
heavily on receiving records from both sides of the join and carefully
selecting the next one to process based on timestamp. Imagine if a
DeserializationException occurs upstream of a repartition feeding into one
side of a join (but not the other) and the user opts to PAUSE this task. If
the join continues  as usual it could lead to missed or incorrect results
when processing is enforced with no records present on one side of the join
but usual traffic flowing through the other. Maybe we could somehow signal
to also PAUSE all downstream/dependent tasks? Should be able to add this
information to the subscription metadata and send to all clients via a
rebalance. There might be better options I'm not seeing. Or we could just
decide to trust the users not to shoot themselves in the foot -- as long as
we write a clear warning in the javadocs this might be fine

Thanks for all the great KIPs!

On Thu, Oct 12, 2023 at 9:51 AM Nick Telford  wrote:

> Hi everyone,
>
> This is a Streams KIP to add a new DeserializationHandlerResponse,
> "SUSPEND", that suspends the failing Task but continues to process other
> Tasks normally.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException
>
> I'm not yet completely convinced that this is practical, as I suspect it
> might be abusing the SUSPENDED Task.State for something it was not designed
> for. The intent is to pause an active Task *without* re-assigning it to
> another instance, which causes cascading failures when the FAIL
> DeserializationHandlerResponse is used.
>
> Let me know what you think!
>
> Regards,
> Nick
>


[DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-12 Thread Nick Telford
Hi everyone,

This is a Streams KIP to add a new DeserializationHandlerResponse,
"SUSPEND", that suspends the failing Task but continues to process other
Tasks normally.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException

I'm not yet completely convinced that this is practical, as I suspect it
might be abusing the SUSPENDED Task.State for something it was not designed
for. The intent is to pause an active Task *without* re-assigning it to
another instance, which causes cascading failures when the FAIL
DeserializationHandlerResponse is used.

Let me know what you think!

Regards,
Nick