[jira] [Created] (KAFKA-16281) Probable IllegalState possible with KIP-966

2024-02-20 Thread Jack Vanlightly (Jira)
Jack Vanlightly created KAFKA-16281:
---

 Summary: Probable IllegalState possible with KIP-966
 Key: KAFKA-16281
 URL: https://issues.apache.org/jira/browse/KAFKA-16281
 Project: Kafka
  Issue Type: Task
  Components: kraft
Reporter: Jack Vanlightly


I have a TLA+ model of KIP-966 and I have identified an IllegalState exception 
that would occur with the existing MaybeHandleCommonResponse behavior.

The issue stems from the fact that a leader, let's call it r1, can resign 
(either due to a restart or check quorum) and then later initiate a pre-vote 
where it ends up in the same epoch as before, but a cleared local leader id. 
When r1 transitions to Prospective it clears its local leader id. When r1 
receives a response from r2 who believes that r1 is still the leader, the logic 
in MaybeHandleCommonResponse tries to transition r1 to follower of itself, 
causing an IllegalState exception to be raised.

This is an example history:
 # r1 is the leader in epoch 1.
 # r1 quorum resigns, or restarts and resigns.
 # r1 experiences an election timeout and transitions to Prospective clearing 
its local leader id.
 # r1 sends a pre vote request to its peers.
 # r2 thinks r1 is still the leader, sends a vote response, not granting its 
vote and setting leaderId=r1 and epoch=1.
 # r1 receives the vote response and executes MaybeHandleCommonResponse which 
tries to transition r1 to Follower of itself and an illegal state occurs.

The relevant else if statement in MaybeHandleCommonResponse is here: 
https://github.com/apache/kafka/blob/a26a1d847f1884a519561e7a4fb4cd13e051c824/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1538

In the TLA+ specification, I fixed this issue by adding a fourth condition to 
this statement, that the leaderId also does not equal this server's id. 
[https://github.com/Vanlightly/kafka-tlaplus/blob/9b2600d1cd5c65930d666b12792d47362b64c015/kraft/kip_996/kraft_kip_996_functions.tla#L336]

We should probably create a test to confirm the issue first and then look at 
using the fix I made in the TLA+, though there may be other options.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-853: KRaft Controller Membership Changes

2024-02-01 Thread Jack Vanlightly
Hi Jose,

I have a question about how voters and observers, which are far behind the
leader, catch-up when there are multiple reconfiguration commands in the
log between their position and the end of the log.

Here are some example situations that need clarification:

Example 1
Imagine a cluster of three voters: r1, r2, r3. Voter r3 goes offline for a
while. In the meantime, r1 dies and gets replaced with r4, and r2 dies
getting replaced with r5. Now the cluster is formed of r3, r4, r5. When r3
comes back online, it tries to fetch from dead nodes and finally starts
unending leader elections - stuck because it doesn't realise it's in a
stale configuration whose members are all dead except for itself.

Example 2
Imagine a cluster of three voters: r1, r2, r3. Voter r3 goes offline then
comes back and discovers the leader is r1. Again, there are many
reconfiguration commands between its LEO and the end of the leader's log.
It starts fetching, changing configurations as it goes until it reaches a
stale configuration (r3, r4, r5) where it is a member but none of its peers
are actually alive anymore. It continues to fetch from the r1, but then for
some reason the connection to r1 is interrupted. r3 starts leader elections
which don't get responses.

Example 3
Imagine a cluster of three voters: r1, r2, r3. Over time, many
reconfigurations have happened and now the voters are (r4, r5, r6). The
observer o1 starts fetching from the nodes in
'controller.quorum.bootstrap.servers' which includes r4. r4 responds with a
NotLeader and that r5 is the leader. o1 starts fetching and goes through
the motion of switching to each configuration as it learns of it in the
log. The connection to r5 gets interrupted while it is in the configuration
(r7, r8, r9). It attempts to fetch from these voters but none respond as
they are all long dead, as this is a stale configuration. Does the observer
fallback to 'controller.quorum.bootstrap.servers' for its list of voters it
can fetch from?

After thinking it through, it occurs to me that in examples 1 and 2, the
leader (of the latest configuration) should be sending BeginQuorumEpoch
requests to r3 after a certain timeout? r3 can start elections (based on
its stale configuration) which will go nowhere, until it eventually
receives a BeginQuorumEpoch from the leader and it will learn of the leader
and resume fetching.

In the case of an observer, I suppose it must fallback to
'controller.quorum.voters' or  'controller.quorum.bootstrap.servers' to
learn of the leader?

Thanks
Jack



On Fri, Jan 26, 2024 at 1:36 AM José Armando García Sancio
 wrote:

> Hi all,
>
> I have updated the KIP to include information on how KRaft controller
> automatic joining will work.
>
> Thanks,
> --
> -José
>


Re: How Kafka handle partition leader change?

2023-11-22 Thread Jack Vanlightly
If you want to understand how the replication protocol works, how it can be
configured for consistency, how it can be configured for availability then
I have written up a more formal description of the protocol and written
TLA+ specifications. These should answer most of your questions and if not,
then please do come back and ask further questions.

How it works today:
- Formal description:
https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/v3.5/description/0_kafka_replication_protocol.md
- TLA+ specification:
https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/v3.5/kafka_replication_v3_5.tla

How it will work when KIP-966 is implemented:
- Formal description:
https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/description/0_kafka_replication_protocol.md
- TLA+ specification:
https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/kafka_replication_kip_966.tla

Hope that helps
Jack


On Wed, Nov 22, 2023 at 6:27 AM De Gao  wrote:

> Looks like the core of the problem should still be the juggling game of
> consistency, availability and partition tolerance.  If we want the cluster
> still work when brokers have inconsistent information due to network
> partition, we have to choose between consistency and availability.
> My proposal is not about fix the message loss. Will share when ready.
> Thanks Andrew.
> 
> From: Andrew Grant 
> Sent: 21 November 2023 12:35
> To: dev@kafka.apache.org 
> Subject: Re: How Kafka handle partition leader change?
>
> Hey De Gao,
>
> Message loss or duplication can actually happen even without a leadership
> change for a partition. For example if there are network issues and the
> producer never gets the ack from the server, it’ll retry and cause
> duplicates. Message loss can usually occur when you use acks=1 config -
> mostly you’d lose after a leadership change but in theory if the leader was
> restarted, the page cache was lost and it stayed leader again we could lose
> the message if it wasn’t replicated soon enough.
>
> You might be right it’s more likely to occur during leadership change
> though - not 100% sure myself on that.
>
> Point being, the idempotent producer really is the way to write once and
> only once as far as I’m aware.
>
> If you have any suggestions for improvements I’m sure the community would
> love to hear them! It’s possible there are ways to make leadership changes
> more seamless and at least reduce the probability of duplicates or loss.
> Not sure myself. I’ve wondered before if the older leader could reroute
> messages for a small period of time until the client knew the new leader
> for example.
>
> Andrew
>
> Sent from my iPhone
>
> > On Nov 21, 2023, at 1:42 AM, De Gao  wrote:
> >
> > I am asking this because I want to propose a change to Kafka. But looks
> like in certain scenario it is very hard to not loss or duplication
> messages. Wonder in what scenario we can accept that and where to draw the
> line?
> >
> > 
> > From: De Gao 
> > Sent: 21 November 2023 6:25
> > To: dev@kafka.apache.org 
> > Subject: Re: How Kafka handle partition leader change?
> >
> > Thanks Andrew.  Sounds like the leadership change from Kafka side is a
> 'best effort' to avoid message duplicate or loss. Can we say that message
> lost is very likely during leadership change unless producer uses
> idempotency? Is this a generic situation that no intent to provide data
> integration guarantee upon metadata change?
> > 
> > From: Andrew Grant 
> > Sent: 20 November 2023 12:26
> > To: dev@kafka.apache.org 
> > Subject: Re: How Kafka handle partition leader change?
> >
> > Hey De Gao,
> >
> > The controller is the one that always elects a new leader. When that
> happens that metadata is changed on the controller and once committed it’s
> broadcast to all brokers in the cluster. In KRaft this would be via a
> PartitonChange record that each broker will fetch from the controller. In
> ZK it’d be via an RPC from the controller to the broker.
> >
> > In either case each broker might get the notification at a different
> time. No ordering guarantee among the brokers. But eventually they’ll all
> know the new leader which means eventually the Produce will fail with
> NotLeader and the client will refresh its metadata and find out the new one.
> >
> > In between all that leadership movement, there are various ways messages
> can get duplicated or lost. However if you use the idempotent producer I
> believe you actually won’t see dupes or missing messages so if that’s an
> important requirement you could look into that. The producer is designed to
> retry in general and when you use the idempotent producer some extra
> metadata is sent around to dedupe any messages server-side that were sent
> multiple times by the client.
> >
> > If you’re 

Re: [DISCUSS] How to detect (and prevent) complex bugs in Kafka?

2023-10-24 Thread Jack Vanlightly
I think it is a great technique and I've used local invariants when doing
system modelling in Jepsen Maelstrom which has no global view of state for
checking global invariants. Sometimes the kind of assertions you want could
be too costly for inclusion in a production system so the idea of gating
them with a kind of debug mode could be useful. Low-cost assertions should
probably be included regardless.

I'm not a Kafka code contributor so I can't comment on using this technique
to avoid the incorrect usage of threads and locks. However, there is also
another concept which could potentially be applied to Kafka as a general
coding principle, that of the Poka Yoke [1]. The idea of the Poka Yoke is
to avoid mistakes by mistake-proofing, making human error physically much
harder.

So we have ways of preventing these kinds of mistakes, through some
mechanism such as types and ways of quickly detecting these issues once
written, in the form of assertions (local invariants).

[1] https://en.wikipedia.org/wiki/Poka-yoke

Jack

On Tue, Oct 24, 2023 at 11:33 AM Divij Vaidya 
wrote:

> Hey folks
>
> We recently came across a bug [1] which was very hard to detect during
> testing and easy to introduce during development. I would like to kick
> start a discussion on potential ways which could avoid this category of
> bugs in Apache Kafka.
>
> I think we might want to start working towards a "debug" mode in the broker
> which will enable assertions for different invariants in Kafka. Invariants
> could be derived from formal verification that Jack [2] and others have
> shared with the community earlier AND from tribal knowledge in the
> community such as network threads should not perform any storage IO, files
> should not fsync in critical product path, metric gauges should not acquire
> a lock etc. The release qualification  process (system tests + integration
> tests) will run the broker in "debug" mode and will validate these
> assertions while testing the system in different scenarios. The inspiration
> for this idea is derived from Marc Brooker's post at
> https://brooker.co.za/blog/2023/07/28/ds-testing.html
>
> Your thoughts on this topic are welcome! Also, please feel free to take
> this idea forward and draft a KIP for a more formal discussion.
>
> [1] https://issues.apache.org/jira/browse/KAFKA-15653
> [2] https://lists.apache.org/thread/pfrkk0yb394l5qp8h5mv9vwthx15084j
>
> --
> Divij Vaidya
>


Re: [DISCUSS] KIP-932: Queues for Kafka

2023-10-04 Thread Jack Vanlightly
I would like to see more explicit discussion of topic retention and share 
groups. There are a few options here from simple to more sophisticated. There 
are also topic-level and share-group level options.

The simple thing would be to ensure that the SPSO of each share group is 
bounded by the Log Start Offset (LSO) of each partition which itself is managed 
by the retention policy. This is a topic-level control which applies to all 
share-groups. I would say that this shared retention is the largest drawback of 
modeling queues on shared logs and this is worth noting.

More sophisticated approaches can be to allow the LSO to advance not (only) by 
retention policy but by the advancement of the lowest SPSO. This can keep the 
amount of data lower by garbage collecting messages that have been acknowledged 
by all share groups. Some people may like that behaviour on those topics where 
share groups are the only consumption model and no replay is needed.

There are per-share-group possibilities such as share-group TTLs where messages 
can be archived on a per share group basis.

Thanks
Jack


Re: Complete Kafka replication protocol description

2023-09-11 Thread Jack Vanlightly
Hi all,

I agree that we should have the protocol description and specification in
the Kafka repository. I have two other specifications to contribute
including the KRaft implementation of Raft and the next gen consumer group
protocol (WIP). I also have a formal prose description of the Kafka
replication protocol for how it works today. That should appear under the
3.5 directory sometime this week.

Regarding moving this kind of stuff into the Kafka repo, there are a number
of additional topics such as:

   - Where to place formal prose descriptions and specifications of
   protocols in the Kafka repo.
   - Given we may add more specifications and potentially other formal
   prose descriptions, how to structure things?
   - Do we use markdown like I have done here with svg files (created in
   Excalidraw with the Excalidraw bits embedded in the svg)., or do we use
   LaTex? Some people may wish to read this directly on GitHub and perhaps
   others like a PDF. Markdown is more accessible and perhaps more likely to
   fulfil the living document approach.

Thanks
Jack


On Mon, Sep 11, 2023 at 1:45 PM Divij Vaidya 
wrote:

> This is very useful Jack!
>
> 1. We are missing a replication protocol specification in our project.
> Ideally it should be a living document and adding what you wrote to
> the docs/design would be a great start towards that goal.
> 2. I have also been building on top of some existing TLA+ to add
> changes to replication protocol brought by features such as Tiered
> Storage, local retention etc. at
>
> https://github.com/divijvaidya/kafka-specification/blob/master/KafkaReplication.tla
> 3. Apart from verifying correctness, I believe a TLA+ model will also
> help developers quickly iterate through their fundamental assumptions
> while making changes. As an example, we recently had a discussion in
> the community on whether to increase leader epoch with shrink/expand
> ISR or not. There was another discussion on whether we can choose the
> replica with the largest end offset as the new leader to reduce
> truncation. In both these cases, we could have quickly modified the
> existing TLA+ model, run through it and verify that the assumptions
> still hold true. It would be great if we can take such discussions as
> an example and demonstrate how TLA+ could have benefitted the
> community. It would help make the case for adding the TLA+ spec as
> part of the community owned project itself.
>
> Right now, things are a bit busy on my end, but I am looking forward
> to exploring what you shared above in the coming weeks (perhaps a
> first review by end of sept).
>
> Thank you again for starting this conversation.
>
> --
> Divij Vaidya
>
> On Mon, Sep 11, 2023 at 4:49 AM Haruki Okada  wrote:
> >
> > Hi Jack,
> >
> > Thank you for the great work, not only the spec but also for the
> > comprehensive documentation about the replication.
> > Actually I wrote some TLA+ spec to verify unclean leader election
> behavior
> > before so I will double-check my understanding with your complete spec :)
> >
> >
> > Thanks,
> >
> > 2023年9月10日(日) 21:42 David Jacot :
> >
> > > Hi Jack,
> > >
> > > This is great! Thanks for doing it. I will look into it when I have a
> bit
> > > of time, likely after Current.
> > >
> > > Would you be interested in contributing it to the main repository? That
> > > would be a great contribution to the project. Having it there would
> allow
> > > the community to maintain it while changes to the protocol are made.
> That
> > > would also pave the way for having other specs in the future (e.g. new
> > > consumer group protocol).
> > >
> > > Best,
> > > David
> > >
> > > Le dim. 10 sept. 2023 à 12:45, Jack Vanlightly 
> a
> > > écrit :
> > >
> > > > Hi all,
> > > >
> > > > As part of my work on formally verifying different parts of Apache
> Kafka
> > > > and working on KIP-966 I have built up a lot of knowledge about how
> the
> > > > replication protocol works. Currently it is mostly documented across
> > > > various KIPs and in the code itself. I have written a complete
> protocol
> > > > description (with KIP-966 changes applied) which is inspired by the
> > > precise
> > > > but accessible style and language of the Raft paper. The idea is
> that it
> > > > makes it easier for contributors and anyone else interested in the
> > > protocol
> > > > to learn how it works, the fundamental properties it has and how
> those
> > > > properties are supported by the various behaviors and conditions.
> > > >
> > > > It currently resides next to the TLA+ specification itself in my
> > > > kafka-tlaplus repository. I'd be interested to receive feedback from
> the
> > > > community.
> > > >
> > > >
> > > >
> > >
> https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/description/0_kafka_replication_protocol.md
> > > >
> > > > Thanks
> > > > Jack
> > > >
> > >
> >
> >
> > --
> > 
> > Okada Haruki
> > ocadar...@gmail.com
> > 
>


Complete Kafka replication protocol description

2023-09-10 Thread Jack Vanlightly
Hi all,

As part of my work on formally verifying different parts of Apache Kafka
and working on KIP-966 I have built up a lot of knowledge about how the
replication protocol works. Currently it is mostly documented across
various KIPs and in the code itself. I have written a complete protocol
description (with KIP-966 changes applied) which is inspired by the precise
but accessible style and language of the Raft paper. The idea is that it
makes it easier for contributors and anyone else interested in the protocol
to learn how it works, the fundamental properties it has and how those
properties are supported by the various behaviors and conditions.

It currently resides next to the TLA+ specification itself in my
kafka-tlaplus repository. I'd be interested to receive feedback from the
community.

https://github.com/Vanlightly/kafka-tlaplus/blob/main/kafka_data_replication/kraft/kip-966/description/0_kafka_replication_protocol.md

Thanks
Jack


Re: [DISCUSS] KIP-966: Eligible Leader Replicas

2023-09-06 Thread Jack Vanlightly
Hi Calvin,

Regarding partition reassignment, I have two comments.

I notice the KIP says "The AlterPartitionReassignments will not change the ELR" 
however, when a reassignment completes (or reverts) any replicas removed from 
the replica set would be removed from the ELR. Sounds obvious but I figured we 
should be explicit about that.

Reassignment should also respect min.insync.replicas because currently a 
reassignment can complete as long as the ISR is not empty and all added 
replicas are members. However, my TLA+ specification, which now includes 
reassignment, finds single broker failures that can cause committed data loss - 
despite the added protection of the ELR and min.insync.replicas=2. These 
scenarios are limited to shrinking the size of the replica set. If we modify 
the PartitionChangeBuilder to add the completion condition that the target ISR 
>= min.insync.replicas, then that closes this last single-broker-failure data 
loss case.

With the above modification, the TLA+ specification of the ELR part of the 
design is standing up to all safety and liveness checks. The only thing that is 
not modeled is the unclean recovery though I may leave that as the 
specification is already very large.

Jack

On 2023/09/01 22:27:10 Calvin Liu wrote:
> Hi Justine
> 1. With the new proposal, in order to let the consumer consume a message
> when only 1 replica commits it to its log, the min ISR has to be set to 1.
> 2. Yes, we will need an unclean recovery if the leader has an unclean
> shutdown.
> 3. If the min ISR config is changed to a larger value, the ISR and ELR will
> not be updated. ISR members are always valid no matter how min ISR changes.
> If ELR is not empty, then the HWM can't advance as well after the min ISR
> increase, so the ELR members are safe to stay.
> 4. I will highlight the explanation. Thanks.
> 
> On Thu, Aug 31, 2023 at 4:35 PM Justine Olshan 
> wrote:
> 
> > Hey Calvin,
> >
> > Thanks for the responses. I think I understood most of it, but had a few
> > follow up questions
> >
> > 1. For the acks=1 case, I was wondering if there is any way to continue
> > with the current behavior (ie -- we only need one ack to produce to the log
> > and consider the request complete.) My understanding is that we can also
> > consume from such topics at that point.
> > If users wanted this lower durability could they set min.insync.replicas to
> > 1?
> >
> > 2. For the case where we elect a leader that was unknowingly offline. Say
> > this replica was the only one in ELR. My understanding is that we would
> > promote it to ISR and remove it from ELR when it is the leader, but then we
> > would remove it from ISR and have no brokers in ISR or ELR. From there we
> > would need to do unclean recovery right?
> >
> > 3. Did we address the case where dynamically min isr is increased?
> >
> > 4. I think my comment was more about confusion on the KIP. It was not clear
> > to me that the section was describing points if one was done before the
> > other. But I now see the sentence explaining that. I think I skipped from
> > "delivery plan" to the bullet points.
> >
> > Justine
> >
> > On Thu, Aug 31, 2023 at 4:04 PM Calvin Liu 
> > wrote:
> >
> > > Hi Justine
> > > Thanks for the questions!
> > >   *a. For my understanding, will we block replication? Or just the high
> > > watermark advancement?*
> > >   - The replication will not be blocked. The followers are free to
> > > replicate messages above the HWM. Only HWM advancement is blocked.
> > >
> > >   b. *Also in the acks=1 case, if folks want to continue the previous
> > > behavior, they also need to set min.insync.replicas to 1, correct?*
> > >   - If the clients only send ack=1 messages and minISR=2. The HWM
> > behavior
> > > will only be different when there is 1 replica in the ISR. In this case,
> > > the min ISR does not do much in the current system. It is kind of a
> > > trade-off but we think it is ok.
> > >
> > >   c. *The KIP seems to suggest that we remove from ELR when we start up
> > > again and notice we do not have the clean shutdown file. Is there a
> > chance
> > > we have an offline broker in ELR that had an unclean shutdown that we
> > elect
> > > as a leader before we get the change to realize the shutdown was
> > unclean?*
> > > *  - *The controller will only elect an unfenced(online) replica as the
> > > leader. If a broker has an unclean shutdown, it should register to the
> > > controller first(where it has to declair whether it is a clean/unclean
> > > shutdown) and then start to serve broker requests. So
> > >  1. If the broker has an unclean shutdown before the controller is
> > > aware that the replica is offline, then the broker can become the leader
> > > temporarily. But it can't serve any Fetch requests before it registers
> > > again, and that's when the controller will re-elect a leader.
> > >  2. If the controller knows the replica is offline(missing heartbeats
> > > from the broker for a while) before 

Re: [DISCUSS] KIP-853: KRaft Voters Change

2022-07-27 Thread Jack Vanlightly
Hi Jose,

It's looking good!

> I think that when a replica has caught up is an implementation detail
> and we can have this detailed discussion in Jira or the PR. What do
> you think?

Yes, that sounds fine. For what it's worth, making the leader take the
decision of when an observer is caught-up or not greatly simplifies it
as the leader has all the information necessary.

Thanks
Jack


Re: [DISCUSS]: Including TLA+ in the repo

2022-07-27 Thread Jack Vanlightly
+1 for me too. Once the KIP-853 is agreed I will make any necessary changes
and submit a PR to the apache/kafka repo.

Jack

On Tue, Jul 26, 2022 at 10:10 PM Ismael Juma  wrote:

> I'm +1 for inclusion in the main repo and I was going to suggest the same
> in the KIP-853 discussion. The original authors of 3 and 4 are part of the
> kafka community, so we can ask them to submit PRs.
>
> Ismael
>
> On Tue, Jul 26, 2022 at 7:58 AM Tom Bentley  wrote:
>
> > Hi,
> >
> > I noticed that TLA+ has featured in the Test Plans of a couple of recent
> > KIPs [1,2]. This is a good thing in my opinion. I'm aware that TLA+ has
> > been used in the past to prove properties of various parts of the Kafka
> > protocol [3,4].
> >
> > The point I wanted to raise is that I think it would be beneficial to the
> > community if these models could be part of the main Kafka repo. That way
> > there are fewer hurdles to their discoverability and it makes it easier
> for
> > people to compare the implementation with the spec. Spreading familiarity
> > with TLA+ within the community is also a potential side-benefit.
> >
> > I notice that the specs in [4] are MIT-licensed, but according to the
> > Apache 3rd party license policy [5] it should be OK to include.
> >
> > Thoughts?
> >
> > Kind regards,
> >
> > Tom
> >
> > [1]:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-TestPlan
> > [2]:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes#KIP853:KRaftVoterChanges-TestPlan
> > [3]: https://github.com/hachikuji/kafka-specification
> > [4]:
> >
> >
> https://github.com/Vanlightly/raft-tlaplus/tree/main/specifications/pull-raft
> > [5]: https://www.apache.org/legal/resolved.html
> >
>


Re: [DISCUSS] KIP-853: KRaft Voters Change

2022-07-21 Thread Jack Vanlightly
Hi Jose,

Thanks for getting this started. Some comments:

- Regarding the removal of voters, when a leader appends a
RemoveVoterRecord to its log, it immediately switches to the new
configuration. There are two cases here:
1. The voter being removed is the leader itself. The KIP documents that
the followers will continue to fetch from the leader despite these
followers learning that the leader has been removed from the voter set.
This is correct. It might be worth stating that the reason is that else the
cluster can get blocked from making further progress as the followers must
keep fetching from the leader else the leader cannot commit the record and
resign.
2. The voter being removed is not the leader. We should document that
the leader will accept fetches with replicas ids (i.e. not observer
fetches) from followers who are not in the voter set of the leader. This
will occur because the voter being removed won't have received the
RemoveVoteRecord yet and it must be allowed to reach this record so that:
a) the RemoveVoterRecord can be committed (this voter may actually be
required in order to reach quorum on this record) and b) the voter being
removed can switch out of voter mode.
- Regarding what happens when a voter leaves the voter set. When a
non-leader voter reaches a RemoveVoterRecord where it is the subject of the
removal, does it switch to being an observer and continue fetching? When a
leader is removed, it carries on until it has committed the record or an
election occurs electing a different leader. Until that point, it is
leader but not a voter, so that makes it an observer? After it has
committed the record and resigns (or gets deposed by an election) does it
then start fetching as an observer?
- I think the Add/RemoveVoterRecords should also include the current voter
set. This will make it easier to code against and also make
troubleshooting easier. Else voter membership can only be reliably
calculated by replaying the whole log.
- Regarding the adding of voters:
1. We should document the problem of adding a new voter which then
causes all progress to be blocked until the voter catches up with the
leader. For example, in a 3 node cluster, we lose 1 node. We add a new node
which means we have a majority = 3, with only 3 functioning nodes. Until
the new node has caught up, the high watermark cannot advance. This can be
solved by ensuring that to add a node we start it first as an observer and
once it has caught up, send the AddVoter RPC to the leader. This leads to
the question of how an observer determines that it has caught up.
2. Perhaps an administrator should send a Join RPC to the node we want
to add. It will, if it isn't already, start fetching as an observer and
automatically do the AddVoter RPC to the leader itself. This way from a
human operations point-of-view, there is only a single action to perform.
- For any of the community that understand or are interested in TLA+,
should we include the provisional TLA+ specification for this work?
https://github.com/Vanlightly/raft-tlaplus/blob/main/specifications/pull-raft/KRaftWithReconfig.tla

Thanks
Jack


On Thu, Jul 21, 2022 at 7:38 PM Justine Olshan 
wrote:

> Hey Jose -- this seems like an important KIP! And I enjoy seeing more Uuid
> usage :)
> I was curious a bit more about the motivation here. That section seems to
> be missing.
>
> Thanks for sharing the KIP!
> Justine
>
> On Thu, Jul 21, 2022 at 10:30 AM Niket Goel 
> wrote:
>
> > Hey Jose,
> >
> > Thanks for the KIP. This is a good improvement and will make the KRaft
> > implementation much more robust in the face of failures and generally
> make
> > it more flexible for users.
> >
> > I did a first pass through the KIP and here are some comments (some of
> > these might just be a little uninformed, so feel free to direct me to
> > supplemental reading):
> > Overall protocol safety wise, the reconfiguration operations look safe.
> >
> > > This UUID will be generated once and persisted as part of the quorum
> > state for the topic partition
> > Do we mean that it will be generated every time the disk on the replica
> is
> > primed (so every disk incarnation will have UUID). I think you describe
> it
> > in a section further below. Best to pull it up to here — “the replica
> uuid
> > is automatically generated once by the replica when persisting the quorum
> > state for the first time.”
> >
> > > If there are any pending voter change operations the leader will wait
> > for them to finish.
> > Will new requests be rejected or queued up behind the pending operation.
> > (I am assuming rejected, but want to confirm)
> >
> > > When this option is used the leader of the KRaft topic partition will
> > not allow the AddVoter RPC to add replica IDs that are not describe in
> the
> > configuration and it would not allow the RemoveVoter RPC to remove
> replica
> > IDs that are described in the configuration.
> > Bootstrapping is a little tricky I think. Is it safer/simpler 

Re: [DISCUSS] KIP-782: Expandable batch size in producer

2022-05-02 Thread Jack Vanlightly
The current configs are hard to use for the Kafka user and a little inflexible 
so I am pleased to see the discussion.

Ultimately we want flexibility. We don't want to force users to understand the 
underlying implementation/protocol and we want the producer to handle high or 
low throughput efficiently - with the right amount of inflight data for the 
conditions.

Given each producer by default has limited inflight requests per connection 
then we need extra flexibility in the batch sizing.

The way I would like to be able to approach as a user is that 1) I use 
linger.ms to bound my client-side produce latency 2) I use request.size and 
max.inflight.requests.per.connection to calculate my upper-bound inflight data. 
As a user, I don't have to think about anything else (no edge cases or 
caveats). The producer handles the batching intelligently, trying to fully 
utilise the request sizes while respecting the linger.

I understand Jun's point about increasing the default batch size. The only 
downside I see is having to juggle the request vs batch size to avoid delaying 
batches because we can only fit so many big batches in a request. That is a 
calculation we force on the user and reduces flexibility.

As I understand the proposed change, each ProducerBatch would be physically 
made up of a list of ByteBuffers that allows for dynamically increasing the 
batch size in increments of batch.initial.size.

The documented benefit is that we can send large batches when there is lots of 
data ready to be sent, while also not having to wait for linger.ms when there 
is less data. Comparing this to Jun's suggested approach, it looks the same 
except that we get size triggered batches instead of linger when throughput is 
low. How big an advantage is that?

An alternative dynamic sizing strategy is one based on fairness. Batches are 
dynamically sized in a fair way that more fairly distributes the data of each 
partition across the request.

Messages are added to small sub-batches (say 4kb or 16kb) in the accumulator. 
When draining the sub-batches in the Sender, the logic selects sub-batches by 
creation order repeatedly until constraints are met (like request.size). All 
sub-batches of the same partition are grouped into a single batch for sending. 

This way the user can set a high batch.size (could even default it to request 
size) as the final batches will be fairly distributed across partitions and we 
should get good utilisation of the request.size (and still respecting linger).

Example with request.size=128kb, 16kb sub-batches, 1 broker and picking 
sub-batches in creation order. None have reached linger in this example for 
simplicity.

Accumulator
p0: [sb1{t1},sb2{t2},sb3{t3},sb4{t4},sb5{t5},sb6{t6},sb7{t7},sb8{t8}]
p1: [sb1{t1}]
p2: [sb1{t3}]
p3: [sb1{t7}]

Request 1
p0: [sb1-sb5]
p1: [sb1]
p2: [sb1]

Request 2 (with no additional data having had arrived)
p0: [sb6-sb8]
p3: [sb1]

The downsides are a decent refactoring.
There would need to be changes to the housekeeping of how callbacks are managed 
for example. There might be more impact that I am not aware of, I'm fairly new 
to this code.

Thanks
Jack

On 2021/12/13 19:15:37 Jun Rao wrote:
> Hi, Lucas,
> 
> Thanks for the reply. It would be useful to summarize the benefits of a
> separate batch.max.size. To me, it's not clear why a user would want two
> different batch sizes. In your example, I can understand why a user would
> want to form a batch with a 5ms linger. But why would a user prefer 16K
> batches with 5ms linger, if say 256K is deemed best for throughput?
> 
> Thanks,
> 
> Jun
> 
> On Fri, Dec 10, 2021 at 4:35 PM Lucas Bradstreet 
> wrote:
> 
> > Hi Jun,
> >
> > One difference compared to increasing the default batch size is that users
> > may actually prefer smaller batches but it makes much less sense to
> > accumulate many small batches if a batch is already sending.
> >
> > For example, imagine a user that prefer 16K batches with 5ms linger.
> > Everything is functioning normally and 16KB batches are being sent. Then
> > there's a 500ms blip for that broker. Do we want to continue to accumulate
> > 16KB batches, each of which requires a round trip, or would we prefer to
> > accumulate larger batches while sending is blocked?
> >
> > I'm not hugely against increasing the default batch.size in general, but
> > batch.max.size does seem to have some nice properties.
> >
> > Thanks,
> >
> > Lucas
> >
> > On Fri, Dec 10, 2021 at 9:42 AM Jun Rao  wrote:
> >
> > > Hi, Artem, Luke,
> > >
> > > Thanks for the reply.
> > >
> > > 11. If we get rid of batch.max.size and increase the default batch.size,
> > > it's true the behavior is slightly different than before. However, does
> > > that difference matter to most of our users? In your example, if a user
> > > sets linger.ms to 100ms and thinks 256KB is good for throughput, does it
> > > matter to deliver any batch smaller than 256KB before 100ms? I also find
> > it
> > > a bit hard to explain