I think it's tricky to propagate a pauseAll() via the rebalance
protocol. New members joining the group would need to get paused, too?
Could there be weird race conditions with overlapping pauseAll() and
resumeAll() calls on different instanced while there could be a errors /
network partitions or similar?
I would argue that similar to IQ, we provide the basic building blocks,
and leave it the user users to implement cross instance management for a
pauseAll() scenario. -- Also, if there is really demand, we can always
add pauseAll()/resumeAll() as follow up work.
About named typologies: I agree to Jim to not include them in this KIP
as they are not a public feature yet. If we make named typologies
public, the corresponding KIP should extend the pause/resume feature
(ie, APIs) accordingly. Of course, the code can (and should) already be
setup to support it to be future proof.
Good call out about commit and EOS -- to simplify it, I think it might
be good to commit also for the at-least-once case?
-Matthias
On 5/6/22 1:05 PM, Jim Hughes wrote:
Hi Bill,
Great questions; I'll do my best to reply inline:
On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <bbej...@gmail.com> wrote:
Hi Jim,
Thanks for the KIP. I have a couple of meta-questions as well:
1) Regarding pausing only a subset of running instances, I'm thinking there
may be a use case for pausing all of them.
Would it make sense to also allow for pausing all instances by adding a
method `pauseAll()` or something similar?
Honestly, I'm indifferent on this point. Presently, I think what I have
proposed is the minimal change to get the ability to pause and resume
processing. If adding a 'pauseAll()' is required, I'd be happy to do that!
From Guozhang's email, it sounds like this would require using the
rebalance protocol to trigger the coordination. Would there be enough room
in that approach to indicate that a named topology is to be paused across
all nodes?
2) Would pausing affect standby tasks? For example, imagine there are 3
instances A, B, and C.
A user elects to pause instance C only but it hosts the standby tasks
for A.
Would the standby tasks on the paused application continue to read from
the changelog topic?
Yes, standby tasks would continue reading from the changelog topic. All
consumers would continue reading to avoid getting dropped from their
consumer groups.
Cheers,
Jim
Thanks!
Bill
On Fri, May 6, 2022 at 2:44 PM Jim Hughes <jhug...@confluent.io.invalid>
wrote:
Hi Guozhang,
Thanks for the feedback; responses inline below:
On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <wangg...@gmail.com> wrote:
Hello Jim,
Thanks for the proposed KIP. I have some meta questions about it:
1) Would an instance always pause/resume all of its current owned
topologies (i.e. the named topologies), or are there any scenarios
where
we
only want to pause/resume a subset of them?
An instance may wish to pause some of its named topologies. I was unsure
what to say about named topologies in the KIP since they seem to be an
internal detail at the moment.
I intend to add to KafkaStreamsNamedTopologyWrapper methods like:
public void pauseNamedTopology(final String topologyToPause)
public boolean isNamedTopologyPaused(final String topology)
public void resumeNamedTopology(final String topologyToResume)
2) From a user's perspective, do we want to always issue a
`pause/resume`
to all the instances or not? For example, we can define the semantics
of
the function as "you only need to call this function on any of the
application's instances, and all instances would then pause (via the
rebalance error codes)", or as "you would call this function for all
the
instances of an application". Which one are you referring to?
My initial intent is that one would call this function on any instances
of
the application that one wishes to pause. This should allow more control
(in case one wanted to pause a portion of the instances). On the other
hand, this approach would put more work on the implementer to coordinate
calling pause or resume across instances.
If the other option is more suitable, happy to do that instead.
3) With EOS, there's a transaction timeout which would determine how
long a
transaction can stay idle before it's force-aborted on the broker
side. I
think when a pause is issued, that means we'd need to immediately
commit
the current transaction for EOS since we do not know how long we could
pause for. Is that right? If yes could you please clarify that in the
doc
as well.
Good point. My intent is for pause() to wait for the next iteration
through `runOnce()` and then only skip over the processing for paused
tasks
in `taskManager.process(numIterations, time)`.
Do commits live inside that call or do they live across/outside of it?
In
the former case, I think there shouldn't be any issues with EOS.
Otherwise, we may need to work through some details to get EOS right.
Once we figure that out, I can update the KIP.
Thanks,
Jim
Guozhang
On Wed, May 4, 2022 at 10:51 AM Jim Hughes
<jhug...@confluent.io.invalid
wrote:
Hi all,
I have written up a KIP for adding the ability to pause and resume
the
processing of a topology in AK Streams. The KIP is here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
Thanks in advance for your feedback!
Cheers,
Jim
--
-- Guozhang