Thanks for the KIP. Overall LGTM.

Can we clarify one question: would it be allowed to call `pause()` before calling `start()`? I don't see any reason why we would need to disallow it?

It could be helpful to start a KafkaStreams client in paused state -- otherwise there is a race between calling `start()` and calling `pause()`.

If we allow it, we should clearly document it.


-Matthias

On 5/10/22 12:04 PM, Jim Hughes wrote:
Hi Bill, all,

Thank you.  I've updated the KIP to reflect pausing standby tasks as well.
I think all the outstanding points have been addressed and I'm going to
start the vote thread!

Cheers,

Jim



On Tue, May 10, 2022 at 2:43 PM Bill Bejeck <bbej...@gmail.com> wrote:

Hi Jim,

After reading the comments on the KIP, I agree that it makes sense to pause
all activities and any changes can be made later on.

Thanks,
Bill

On Tue, May 10, 2022 at 4:03 AM Bruno Cadonna <cado...@apache.org> wrote:

Hi Jim,

Thanks for the KIP!

I am fine with the KIP in general.

However, I am with Sophie and John to also pause the standbys for the
reasons they brought up. Is there a specific reason you want to keep
standbys going? It feels like premature optimization to me. We can still
add keeping standby running in a follow up if needed.

Best,
Bruno

On 10.05.22 05:15, Sophie Blee-Goldman wrote:
Thanks Jim, just one note/question on the standby tasks:

At the minute, my moderately held position is that standby tasks ought
to
continue reading and remain caught up.  If standby tasks would run out
of
space, there are probably bigger problems.


For a single node application, or when the #pause API is invoked on all
instances,
then there won't be any further active processing and thus nothing to
keep
up with,
right? So for that case, it's just a matter of whether any standbys
that
are lagging
will have the chance to catch up to the (paused) active task state
before
they stop
as well, in which case having them continue feels fine to me. However
this
is a
relatively trivial benefit and I would only consider it as a deciding
factor when all
things are equal otherwise.

My concern is the more interesting case: when this feature is used to
pause
only
one nodes, or some subset of the overall application. In this case,
yes,
the standby
tasks will indeed fall out of sync. But the only reason I can imagine
someone using
the pause feature in such a way is because there is something going
wrong,
or about
to go wrong, on that particular node. For example as mentioned above,
if
the user
wants to cut down on costs without stopping everything, or if the node
is
about to
run out of disk or needs to be debugged or so on. And in this case,
continuing to
process the standby tasks while other instances continue to run would
pretty much
defeat the purpose of pausing it entirely, and might have unpleasant
consequences
for the unsuspecting developer.

All that said, I don't want to block this KIP so if you have strong
feelings about the
standby behavior I'm happy to back down. I'm only pushing back now
because
it
felt like there wasn't any particular motivation for the standbys to
continue processing
or not, and I figured I'd try to fill in this gap with my thoughts on
the
matter :)
Either way we should just make sure that this behavior is documented
clearly,
since it may be surprising if we decide to only pause active processing
(another option
is to rename the method something like #pauseProcessing or
#pauseActiveProcessing
so that it's hard to miss).

Thanks! Sorry for the lengthy response, but hopefully we won't need to
debate this any
further. Beyond this I'm satisfied with the latest proposal

On Mon, May 9, 2022 at 5:16 PM John Roesler <vvcep...@apache.org>
wrote:

Thanks for the updates, Jim!

After this discussion and your updates, this KIP looks good to me.

Thanks,
John

On Mon, May 9, 2022, at 17:52, Jim Hughes wrote:
Hi Sophie, all,

I've updated the KIP with feedback from the discussion so far:



https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832

As a terse summary of my current position:
Pausing will only stop processing and punctuation (respecting modular
topologies).
Paused topologies will still a) consume from input topics, b) call
the
usual commit pathways (commits will happen basically as they would
have),
and c) standBy tasks will still be processed.

Shout if the KIP or those details still need some TLC.  Responding to
Sophie inline below.


On Mon, May 9, 2022 at 6:06 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

Don't worry, I'm going to be adding the APIs for topology-level
pausing
as
part of the modular topologies KIP,
so we don't need to worry about that for now. That said, I don't
think
we
should brush it off entirely and design
this feature in a way that's going to be incompatible or hugely
raise
the
LOE on bringing the (mostly already
implemented) modular topologies feature into the public API, just
because it "won the race to write a KIP" :)


Yes, I'm hoping that this is all compatible with modular
topologies.  I
haven't seen anything so far which seems to be a problem; this KIP is
just
in a weird state to discuss details of acting on modular
topologies.:)


I may be biased (ok, I definitely am), but I'm not in favor of
adding
this
as a state regardless of the modular topologies.
First of all any change to the KafkaStreams state machine is a
breaking
change, no? So we would have to wait until
the next major release which seems like an unnecessary thing to
block
on.
(Whether to add this as a state to the
StreamThread's FSM is an implementation detail).


+1.  I am sold on skipping out on new states.  I had that as a
rejected
alternative in the KIP and have added a few more words to that bit.


Also, the semantics of using an `isPaused` method to distinguish a
paused
instance (or topology) make more sense
to me -- this is a user-specified status, whereas the KafkaStreams
state is
intended to relay the status of the system
itself. For example, if we are going to continue to poll during
pause,
then
shouldn't the client transition to REBALANCING?
I believe it makes sense to still allow distinguishing these states
while a
client is paused, whereas making PAUSED its
own state means you can't tell when the client is rebalancing vs
running,
or whether it is paused or dead: presumably
the NOT_RUNNING/ERROR state would trump the PAUSED state, which
means
you
would not be able to rely on
checking the state to see if you had called PAUSED on that instance.
Obviously you can work around this by just
maintaining a flag in the usercode, but all this feels very
unnatural
to me
vs just checking the `#isPaused` API.

On that note, I had one question -- at what point would the
`#isPaused`
check return true? Would it do so immediately
after pausing the instance, or only once it has finished committing
offsets
and stopped returning records?


Immediately, `#isPaused` tells you about metadata.


Finally, on the note of punctuators I think it would make most sense
to
either pause these as well or else add this an
an explicit option for the user. If this feature is used to, for
example,
help save on processing costs while an app is
not in use, then it would probably be surprising and perhaps
alarming
to
see certain kinds of processing still continue.


  From other parts of the discussion, I'm sold on pausing punctuation.


The question of whether to continue fetching for standby tasks is
maybe
a
bit more debatable, as it would certainly be
nice to find your clients all caught up when you go to resume the
instance
again, but I would still strongly suggest
pausing these as well. To use a similar example, imagine if you
paused
an
app because it was about to run out of
disk. If the standbys kept processing and filled up the remaining
space,
you'd probably feel a bit betrayed by this API.

WDYT?


At the minute, my moderately held position is that standby tasks
ought
to
continue reading and remain caught up.  If standby tasks would run
out
of
space, there are probably bigger problems.

If later it is desirable to manage punctuation or standby tasks, then
it
should be easy for future folks to modify things.

Overall, I'd frame this KIP as "pause processing resulting in
outputs".

Cheers,

Jim



On Mon, May 9, 2022 at 10:33 AM Guozhang Wang <wangg...@gmail.com>
wrote:

I think for named topology we can leave the scope of this KIP as
"all
or
nothing", i.e. when you pause an instance you pause all of its
topologies.
I raised this question in my previous email just trying to clarify
if
this
is what you have in mind. We can leave the question of finer
controlled
pausing behavior for later when we have named topology being
exposed
via
another KIP.


Guozhang

On Mon, May 9, 2022 at 7:50 AM John Roesler <vvcep...@apache.org>
wrote:

Hi Jim,

Thanks for the replies. This all sounds good to me. Just two
further
comments:

3. It seems like you should aim for the simplest semantics. If the
intent
is to “pause” the instance, then you’d better pause the whole
instance.
If
you leave punctuations and standbys running, I expect we’d see bug
reports
come in that the instance isn’t really paused.

5. Since you won the race to write a KIP, I don’t think it makes
too
much
sense to worry too much about modular topologies. When they
propose
their
KIP, they will have to specify a lot of state management behavior,
and
pause/resume will have to be part of it. If they have some concern
about
your KIP, they’ll chime in. It doesn’t make sense for you to try
and
guess
what that proposal will look like.

To be honest, you’re proposing a KafkaStreams runtime-level
pause/resume
function, not a topology-level one anyway, so it seems pretty
clear
that
it
would pause the whole runtime (of a single instance) regardless of
any
modular topologies. If the intent is to pause individual
topologies
in
the
future, you’d need a different API anyway.

Thanks!
-John

On Mon, May 9, 2022, at 08:10, Jim Hughes wrote:
Hi John,

Long emails are great; responding inline!

On Sat, May 7, 2022 at 4:54 PM John Roesler <vvcep...@apache.org

wrote:

Thanks for the KIP, Jim!

This conversation seems to highlight that the KIP needs to
specify
some of its behavior as well as its APIs, where the behavior is
observable and significant to users.

For example:

1. Do you plan to have a guarantee that immediately after
calling KafkaStreams.pause(), users should observe that the
instance
stops processing new records? Or should they expect that the
threads
will continue to process some records and pause asynchronously
(you already answered this in the thread earlier)?


I'm happy to build up to a guarantee of sorts.  My current idea
is
that
pause() does not do anything "exceptional" to get control back
from a
running topology.  A currently running topology would get to
complete
its
loop.

Separately, I'm still piecing together how commits work.  By some
mechanism, after a pause, I do agree that the topology needs to
commit
its
work in some manner.


2. Will the threads continue to poll new records until they
naturally
fill
up the task buffers, or will they immediately pause their
Consumers
as well?


Presently, I'm suggesting that consumers would fill up their
buffers.


3. Will threads continue to call (system time) punctuators, or
would
punctuations also be paused?


In my first pass at thinking through this, I left the punctuators
running.
To be honest, I'm not sure what they do, so my approach is either
lucky
and
correct or it could be Very Clearly Wrong.;)


I realize that some of those questions simply may not have
occurred
to
you, so this is not a criticism for leaving them off; I'm just
pointing
out
that although we don't tend to mention implementation details in
KIPs,
we also can't be too high level, since there are a lot of
operational
details that users rely on to achieve various behaviors in
Streams.


Ayup, I will add some details as we iron out the guarantees,
implementation
details that are at the API level.  This one is tough since
internal
features like NamedTopologies are part of the discussion.



A couple more comments:

4. +1 to what Guozhang said. It seems like we should we also do
a
commit
before entering the paused state. That way, any open
transactions
would
be closed and not have to worry about timing out. Even under
ALOS,
it
seems best to go ahead and complete the processing of in-flight
records
by committing. That way, if anything happens to die while it's
paused,
existing
work won't have to be repeated. Plus, if there are any
processors
with
side
effects, users won't have to tolerate weird edge cases where a
pause
occurs
after a processor sees a record, but before the result is sent
to
its
outputs.

5. I noticed that you proposed not to add a PAUSED state, but I
didn't
follow
the rationale. Adding a state seems beneficial for a number of
reasons:
StreamThreads already use the thread state to determine whether
to
process
or not, so avoiding a new State would just mean adding a
separate
flag
to
track
and then checking your new flag in addition to the State in the
thread.
Also,
operating Streams applications is a non-trivial task, and users
rely
on
the State
(and transitions) to understand Streams's behavior. Adding a
PAUSED
state
is an elegant way to communicate to operators what is happening
with
the
application. Note that the person digging though logs and
metrics,
trying
to understand why the application isn't doing anything is
probably
not
going
to be the same person who is calling pause() and resume(). Also,
if
you
add
a state, you don't need `isPaused()`.

5b. If you buy the arguments to go ahead and commit as well as
the
argument to add a State, then I'd also suggest to follow the
existing
patterns
for the shutdown states by also adding PAUSING. That
way, you'll also expose a way to understand that Streams
received
the
signal
to pause, and that it's still processing and committing some
records
in
preparation to enter a PAUSED state. I'm not sure if a RESUMING
state
would
also make sense.


I hit a tricky bit when thinking through having a PAUSED
state...  If
one
is using Named Topologies, and some of them are paused, what
state is
the
Streams instance in?  If we can agree on that, things may become
clear....
I can see two quick ideas:

1.  The state is RUNNING and NamedTopologies have some other way
to
indicate state.

2.  The state is something messy like PARTIALLY_PAUSED to reflect
that
the
instance has something interesting going on.

When I poked at things initially, I did try out having different
states,
and I readily agree that a PAUSING state may make sense.
(Especially
if
there's a need to run commits before transitioning all the way to
PAUSED.)



And that's all I have to say about that. I hope you don't find
my
long message offputting. I'm fundamentally in favor of your KIP,
and I think with a little more explanation in the KIP, and a few
small tweaks to the proposal, we'll be able to provide good
ergonomics to our users.


Thanks!

Jim


Thanks,
-John

On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote:
I'm in favor of the "just pausing the instance itself“ option
as
well. As
for EOS, the point is that when the processing is paused, we
would
not
trigger any `producer.send` during the time, and the
transaction
timeout
is
sort of relying on that behavior, so my point was that it's
probably
better
to also commit the processing before we pause it.


Guozhang

On Fri, May 6, 2022 at 6:12 PM Jim Hughes
<jhug...@confluent.io.invalid>
wrote:

Hi Matthias,

Since the only thing which will be paused is processing the
topology, I
think we can let commits happen naturally.

Good point about getting the paused state to new members; it
is
seeming
like the "building block" approach is a good one to keep
things
simple
at
first.

Cheers,

Jim

On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <
mj...@apache.org

wrote:

I think it's tricky to propagate a pauseAll() via the
rebalance
protocol. New members joining the group would need to get
paused,
too?
Could there be weird race conditions with overlapping
pauseAll()
and
resumeAll() calls on different instanced while there could
be a
errors /
network partitions or similar?

I would argue that similar to IQ, we provide the basic
building
blocks,
and leave it the user users to implement cross instance
management
for a
pauseAll() scenario. -- Also, if there is really demand, we
can
always
add pauseAll()/resumeAll() as follow up work.

About named typologies: I agree to Jim to not include them
in
this
KIP
as they are not a public feature yet. If we make named
typologies
public, the corresponding KIP should extend the pause/resume
feature
(ie, APIs) accordingly. Of course, the code can (and should)
already
be
setup to support it to be future proof.

Good call out about commit and EOS -- to simplify it, I
think
it
might
be good to commit also for the at-least-once case?


-Matthias


On 5/6/22 1:05 PM, Jim Hughes wrote:
Hi Bill,

Great questions; I'll do my best to reply inline:

On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <
bbej...@gmail.com>
wrote:

Hi Jim,

Thanks for the KIP.  I have a couple of meta-questions as
well:

1) Regarding pausing only a subset of running instances,
I'm
thinking
there
may be a use case for pausing all of them.
      Would it make sense to also allow for pausing all
instances
by
adding a
method `pauseAll()` or something similar?


Honestly, I'm indifferent on this point.  Presently, I
think
what I
have
proposed is the minimal change to get the ability to pause
and
resume
processing.  If adding a 'pauseAll()' is required, I'd be
happy
to
do
that!

   From Guozhang's email, it sounds like this would require
using
the
rebalance protocol to trigger the coordination.  Would
there
be
enough
room
in that approach to indicate that a named topology is to
be
paused
across
all nodes?


2) Would pausing affect standby tasks?  For example,
imagine
there
are 3
instances A, B, and C.
      A user elects to pause instance C only but it hosts
the
standby
tasks
for A.
      Would the standby tasks on the paused application
continue
to
read
from
the changelog topic?


Yes, standby tasks would continue reading from the
changelog
topic.
All
consumers would continue reading to avoid getting dropped
from
their
consumer groups.

Cheers,

Jim




Thanks!
Bill


On Fri, May 6, 2022 at 2:44 PM Jim Hughes
<jhug...@confluent.io.invalid

wrote:

Hi Guozhang,

Thanks for the feedback; responses inline below:

On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Jim,

Thanks for the proposed KIP. I have some meta questions
about
it:

1) Would an instance always pause/resume all of its
current
owned
topologies (i.e. the named topologies), or are there
any
scenarios
where
we
only want to pause/resume a subset of them?


An instance may wish to pause some of its named
topologies.
I
was
unsure
what to say about named topologies in the KIP since they
seem
to
be
an
internal detail at the moment.

I intend to add to KafkaStreamsNamedTopologyWrapper
methods
like:
       public void pauseNamedTopology(final String
topologyToPause)
       public boolean isNamedTopologyPaused(final String
topology)
       public void resumeNamedTopology(final String
topologyToResume)



2) From a user's perspective, do we want to always
issue a
`pause/resume`
to all the instances or not? For example, we can define
the
semantics
of
the function as "you only need to call this function on
any
of
the
application's instances, and all instances would then
pause
(via
the
rebalance error codes)", or as "you would call this
function
for
all
the
instances of an application". Which one are you
referring
to?


My initial intent is that one would call this function
on
any
instances
of
the application that one wishes to pause.  This should
allow
more
control
(in case one wanted to pause a portion of the
instances).
On
the
other
hand, this approach would put more work on the
implementer
to
coordinate
calling pause or resume across instances.

If the other option is more suitable, happy to do that
instead.


3) With EOS, there's a transaction timeout which would
determine
how
long a
transaction can stay idle before it's force-aborted on
the
broker
side. I
think when a pause is issued, that means we'd need to
immediately
commit
the current transaction for EOS since we do not know
how
long
we
could
pause for. Is that right? If yes could you please
clarify
that in
the
doc
as well.


Good point.  My intent is for pause() to wait for the
next
iteration
through `runOnce()` and then only skip over the
processing
for
paused
tasks
in `taskManager.process(numIterations, time)`.

Do commits live inside that call or do they live
across/outside of
it?
In
the former case, I think there shouldn't be any issues
with
EOS.
Otherwise, we may need to work through some details to
get
EOS
right.

Once we figure that out, I can update the KIP.

Thanks,

Jim





Guozhang



On Wed, May 4, 2022 at 10:51 AM Jim Hughes
<jhug...@confluent.io.invalid

wrote:

Hi all,

I have written up a KIP for adding the ability to
pause
and
resume
the
processing of a topology in AK Streams.  The KIP is
here:












https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832

Thanks in advance for your feedback!

Cheers,

Jim



--
-- Guozhang








--
-- Guozhang




--
-- Guozhang







Reply via email to