Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1298

2022-10-14 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.3 #105

2022-10-14 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1297

2022-10-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-14 Thread David Arthur
Jun, a few thoughts on 10

> using the pipelining approach to write to ZK for better throughput and
using conditional writes for correctness;

For writes to ZK from the KRaft controller, I think we can reuse some or
all of the code in KafkaZkClient which does the MultiOp correctness CheckOp
along with retries, error handling, and pipelining. For the common
pipelining cases (partition state changes and topic creation), we should be
able to preserve the existing pipelining semantics since those are in the
context of a single record batch in KRaft.

> (2) sending the proper LeaderAndIsr and UpdateMetadata requests.

I agree this is one of the more challenging aspects of this design. In
fact, this is really the critical piece of the design that allows us to do
these migrations online. We either need to make the ZK controller
understand the metadata log and KRaft semantics, or we need to make the
KRaft controller understand ZK broker semantics. Both are complex, but I
think the latter is easier :) I do expect we'll want to extract and reuse
some ZK controller code where it makes sense (e.g.,
ControllerChannelManager, ControllerBrokerRequestBatch, KafkaZkClient).

Thanks!
David



On Fri, Oct 14, 2022 at 2:39 PM Jun Rao  wrote:

> Hi, Colin,
>
> 10. "That all goes away in the new mode, and we just have some code which
> analyzes __cluster_metadata and reflects it in 1) updates to ZK and 2)
> messages sent out to brokers."
> Hmm, I am not sure it's that simple. Some of the complexity of the ZK-based
> controller are (1) using the pipelining approach to write to ZK for better
> throughput and using conditional writes for correctness; (2) sending the
> proper LeaderAndIsr and UpdateMetadata requests. For example, during
> controller failover, the full metadata needs to be sent while during
> individual broker failure, only some of the metadata needs to be updated.
> The controlled shutdown handling sometimes uses StopReplicaRequest  and
> some other times uses LeaderAndIsrRequest. (3) triggering new events based
> on the responses of LeaderAndIsr (e.g. for topic deletion). Some of those
> complexity could be re-implemented in a more efficient way, but we need to
> be really careful not to generate regression. Some of the other complexity
> just won't go away. Reimplementing all those logic for the 30 or so events
> in the ZK-based controller is possible, but seems a bit daunting and risky.
>
> Thanks,
>
> Jun
>
> On Fri, Oct 14, 2022 at 9:29 AM Colin McCabe  wrote:
>
> > On Thu, Oct 13, 2022, at 11:44, Jun Rao wrote:
> > > Hi, Colin,
> > >
> > > Thanks for the reply.
> > >
> > > 10. This is a bit on the implementation side. If you look at the
> existing
> > > ZK-based controller, most of the logic is around maintaining an
> in-memory
> > > state of all the resources (broker, topic, partition, etc),
> > reading/writing
> > > to ZK, sending LeaderAndIsr and UpdateMetadata requests and handling
> the
> > > responses to brokers. So we need all that logic in the dual write mode.
> > One
> > > option is to duplicate all that logic in some new code. This can be a
> bit
> > > error prone and makes the code a bit harder to maintain if we need to
> fix
> > > some critical issues in ZK-based controllers. Another option is to try
> > > reusing the existing code in the ZK-based controller. For example, we
> > could
> > > start the EventManager in the ZK-based controller, but let the KRaft
> > > controller ingest new events. This has its own challenges: (1) the
> > existing
> > > logic only logs ZK failures and doesn't expose them to the caller; (2)
> > the
> > > existing logic may add new events to the queue itself and we probably
> > need
> > > to think through how this is coordinated with the KRaft controller; (3)
> > it
> > > registers some ZK listeners unnecessarily (may not be a big concern).
> So
> > we
> > > need to get around those issues somehow. I am wondering if we have
> > > considered both options and which approach we are leaning towards for
> the
> > > implementation.
> > >
> >
> > Yes, this is a good question. My take is that a big part of the
> complexity
> > in the old controller code results from the fact that we use ZK as a
> > multi-writer database for propagating information between different
> > components. So in the old controller, every write to ZK needs to be
> > structured as a compare and swap to be fully correct. Every time we get
> > notified about something, it's usually in the form of "this znode
> changed"
> > which prompts a full reload of part of the data in ZK (which itself has
> > multiple parts, loading, deserializing, reconciling, etc.) That all goes
> > away in the new mode, and we just have some code which analyzes
> > __cluster_metadata and reflects it in 1) updates to ZK and 2) messages
> sent
> > out to brokers.
> >
> > This is pretty decoupled from the other logic in QuorumController and
> > should be easy to unit test, since the same inputs from the log always
> > produce the same 

[jira] [Created] (KAFKA-14305) KRaft Metadata Transactions

2022-10-14 Thread David Arthur (Jira)
David Arthur created KAFKA-14305:


 Summary: KRaft Metadata Transactions
 Key: KAFKA-14305
 URL: https://issues.apache.org/jira/browse/KAFKA-14305
 Project: Kafka
  Issue Type: New Feature
Reporter: David Arthur
 Fix For: 3.4.0


[https://cwiki.apache.org/confluence/display/KAFKA/KIP-868+Metadata+Transactions]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-14 Thread Jun Rao
Hi, Colin,

10. "That all goes away in the new mode, and we just have some code which
analyzes __cluster_metadata and reflects it in 1) updates to ZK and 2)
messages sent out to brokers."
Hmm, I am not sure it's that simple. Some of the complexity of the ZK-based
controller are (1) using the pipelining approach to write to ZK for better
throughput and using conditional writes for correctness; (2) sending the
proper LeaderAndIsr and UpdateMetadata requests. For example, during
controller failover, the full metadata needs to be sent while during
individual broker failure, only some of the metadata needs to be updated.
The controlled shutdown handling sometimes uses StopReplicaRequest  and
some other times uses LeaderAndIsrRequest. (3) triggering new events based
on the responses of LeaderAndIsr (e.g. for topic deletion). Some of those
complexity could be re-implemented in a more efficient way, but we need to
be really careful not to generate regression. Some of the other complexity
just won't go away. Reimplementing all those logic for the 30 or so events
in the ZK-based controller is possible, but seems a bit daunting and risky.

Thanks,

Jun

On Fri, Oct 14, 2022 at 9:29 AM Colin McCabe  wrote:

> On Thu, Oct 13, 2022, at 11:44, Jun Rao wrote:
> > Hi, Colin,
> >
> > Thanks for the reply.
> >
> > 10. This is a bit on the implementation side. If you look at the existing
> > ZK-based controller, most of the logic is around maintaining an in-memory
> > state of all the resources (broker, topic, partition, etc),
> reading/writing
> > to ZK, sending LeaderAndIsr and UpdateMetadata requests and handling the
> > responses to brokers. So we need all that logic in the dual write mode.
> One
> > option is to duplicate all that logic in some new code. This can be a bit
> > error prone and makes the code a bit harder to maintain if we need to fix
> > some critical issues in ZK-based controllers. Another option is to try
> > reusing the existing code in the ZK-based controller. For example, we
> could
> > start the EventManager in the ZK-based controller, but let the KRaft
> > controller ingest new events. This has its own challenges: (1) the
> existing
> > logic only logs ZK failures and doesn't expose them to the caller; (2)
> the
> > existing logic may add new events to the queue itself and we probably
> need
> > to think through how this is coordinated with the KRaft controller; (3)
> it
> > registers some ZK listeners unnecessarily (may not be a big concern). So
> we
> > need to get around those issues somehow. I am wondering if we have
> > considered both options and which approach we are leaning towards for the
> > implementation.
> >
>
> Yes, this is a good question. My take is that a big part of the complexity
> in the old controller code results from the fact that we use ZK as a
> multi-writer database for propagating information between different
> components. So in the old controller, every write to ZK needs to be
> structured as a compare and swap to be fully correct. Every time we get
> notified about something, it's usually in the form of "this znode changed"
> which prompts a full reload of part of the data in ZK (which itself has
> multiple parts, loading, deserializing, reconciling, etc.) That all goes
> away in the new mode, and we just have some code which analyzes
> __cluster_metadata and reflects it in 1) updates to ZK and 2) messages sent
> out to brokers.
>
> This is pretty decoupled from the other logic in QuorumController and
> should be easy to unit test, since the same inputs from the log always
> produce the same output in ZK. Basically, ZK is write-only for us, we do
> not read it (with the big exception of broker registration znodes) and I
> think that will greatly simplify things.
>
> So I think dual-write mode as described here will be substantially simpler
> than trying to run part or all of the old controller in parallel. I do
> think we will reuse a bunch of the serialization / deserialization code for
> znodes and possibly the code for communicating with ZK.
>
> best,
> Colin
>
>
> >
> > 14. Good point and make sense.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> >
> > On Wed, Oct 12, 2022 at 3:27 PM Colin McCabe  wrote:
> >
> >> Hi Jun,
> >>
> >> Thanks for taking a look. I can answer some questions here because I
> >> collaborated on this a bit, and David is on vacation for a few days.
> >>
> >> On Wed, Oct 12, 2022, at 14:41, Jun Rao wrote:
> >> > Hi, David,
> >> >
> >> > Thanks for the KIP. A few comments below.
> >> >
> >> > 10. It's still not very clear to me how the KRaft controller works in
> the
> >> > dual writes mode to KRaft log and ZK when the brokers still run in ZK
> >> mode.
> >> > Does the KRaft controller run a ZK based controller in parallel or do
> we
> >> > derive what needs to be written to ZK based on KRaft controller logic?
> >>
> >> We derive what needs to be written to ZK based on KRaft controller
> logic.
> >>
> >> > I am also not sure how the KRaft controller 

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2022-10-14 Thread Omnia Ibrahim
Hi Luke & Justine,
Thanks for looking into this issue, we have been experiencing this because
of rouge clients as well.

> 3. Having a limit to the number of active producer IDs (sort of like an
LRU
>cache)
>-> The idea here is that if we hit a misconfigured client, we will expire
>the older entries. The concern here is we have risks to lose idempotency
>guarantees, and currently, we don't have a way to notify clients about
>losing idempotency guarantees. Besides, the least  recently used entries
>got removed are not always from the "bad" clients.

- I have some concerns about the impact of this option on the transactional
producers, for example, what will happen to an ongoing transaction
associated with an expired PID? Would this leave the transactions in a
"hanging" state?

- How will we notify the client that the transaction can't continue due to
an expired PID?

- If PID got marked as `expired` this will mean that
`admin.DescribeProducers` will not list them which will make
*`kafka-transactions.sh
--list`* a bit tricky as we can't identify if there are transactions linked
to this expired PID or not. The same concern applies to *`kafka-transactions.sh
--find-hanging`*.


>5. limit/throttling the producer id based on the principle
>-> Although we can limit the impact to a certain principle with this idea,
>same concern still exists as solution #1 #2.

I am assuming you mean KafkaPrincipal here! If so is your concern here that
those good clients that use the same principal as a rogue one will get
throttled?

If this is the case, then I believe it should be okay as other throttling
in Kafka on *`/config/users/`* has the same behaviour.


What about applying limit/throttling to
*`/config/users//clients/`
*similar to what we have with client quota? This should reduce the concern
about throttling good clients, right?

best,

Omnia

On Tue, Oct 11, 2022 at 4:18 AM Luke Chen  wrote:

> Bump this thread to see if there are any comments/thoughts.
> Thanks.
>
> Luke
>
> On Mon, Sep 26, 2022 at 11:06 AM Luke Chen  wrote:
>
> > Hi devs,
> >
> > As stated in the motivation section in KIP-854
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
> >:
> >
> > With idempotent producers becoming the default in Kafka, this means that
> > unless otherwise specified, all new producers will be given producer IDs.
> > Some (inefficient) applications may now create many non-transactional
> > idempotent producers. Each of these producers will be assigned a producer
> > ID and these IDs and their metadata are stored in the broker memory,
> which
> > might cause brokers out of memory.
> >
> > Justine (in cc.) and I and some other team members are working on the
> > solutions for this issue. But none of them solves it completely without
> > side effects. Among them, "availability" VS "idempotency guarantees" is
> > what we can't decide which to sacrifice. Some of these solutions
> sacrifice
> > availability of produce (1,2,5) and others sacrifice idempotency
> guarantees
> > (3). It could be useful to know if people generally have a preference one
> > way or the other. Or what other better solutions there might be.
> >
> > Here are the proposals we came up with:
> >
> > 1. Limit the total active producer ID allocation number.
> > -> This is the simplest solution. But since the OOM issue is usually
> > caused by a rogue or misconfigured client, and this solution might
> "punish"
> > the good client from sending messages.
> >
> > 2. Throttling the producer ID allocation rate
> > -> Same concern as the solution #1.
> >
> > 3. Having a limit to the number of active producer IDs (sort of like an
> > LRU cache)
> > -> The idea here is that if we hit a misconfigured client, we will expire
> > the older entries. The concern here is we have risks to lose idempotency
> > guarantees, and currently, we don't have a way to notify clients about
> > losing idempotency guarantees. Besides, the least  recently used entries
> > got removed are not always from the "bad" clients.
> >
> > 4. allow clients to "close" the producer ID usage
> > -> We can provide a way for producer to "close" producerID usage.
> > Currently, we only have a way to INIT_PRODUCER_ID requested to allocate
> > one. After that, we'll keep the producer ID metadata in broker even if
> the
> > producer is "closed". Having a closed API (ex: END_PRODUCER_ID), we can
> > remove the entry from broker side. In client side, we can send it when
> > producer closing. The concern is, the old clients (including non-java
> > clients) will still suffer from the OOM issue.
> >
> > 5. limit/throttling the producer id based on the principle
> > -> Although we can limit the impact to a certain principle with this
> idea,
> > same concern still exists as solution #1 #2.
> >
> > Any thoughts/feedback are welcomed.
> >
> > Thank you.
> > Luke
> >
>


Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-14 Thread David Arthur
Thanks for the discussion so far, everyone!

Mickael

1) In this case, I think the KRaft controller could avoid sending UMR
or LISR to this broker and not include it in LiveBrokers that are sent
to the rest of the cluster. This would effectively fence it off from
the rest of the cluster. The controller can report this as an error.

2) I agree with Colin's suggestion here. At some point, we'll need to
block writes on the kraft controller. We might want a "time that ZK is
blocking KRaft" metric here to accompany the
"ZooKeeperWriteBehindLag". As we get on with the implementation, I
think we'll be able to pick a reasonable size for the allowable
backlog of writes to ZK.



Jun

10) Colin answered this

11) Yes, that's right -- I'll clarify that.

12) +1

13) Colin answered this

14) As Colin mentioned, we achieve ZK controller fencing by bumping
the controller epoch. I'll clarify this

15) The text is a bit unclear here. What I meant is we will return a
random broker as ControllerId in the MetadataResponse sent to clients,
but the active KRaft controller would be sent as the ControllerId in
UpdateMetadataRequest. However, Colin raises a good point that we'll
need a different code path on the ZK brokers for connecting to a KRaft
controller. We should add a check on "controller.quorum.voters" as a
readiness check on the ZK brokers.

16) I think we can remove this. This is a remnant of an earlier design
without forwarding



Colin

1) Yea, I like this. Since we should be able to statically determine
our "readiness" on a broker, we can put this in the registration data.

2) It's not strictly required, no. However, it might be useful for the
case of explicitly disallowing migrations on a cluster. For example,
if someone brought up a KRaft quorum and misconfigured the
"zk.connect", it could trigger a migration on some existing/unrelated
cluster. This would be mostly harmless, but could cause some trouble
for operators. I think of this config like an arming switch for the
migration.

3) Sounds good

4) Sure, sounds good

5) Yea, agreed. We should fence these brokers -- see my response to
Mickael above.



Colin/Jun

As Colin mentioned in his reply, we can react to records in the
metadata log and update ZK and send out RPCs to brokers. Something
like an asynchronous listener to the metadata log.

Another important detail here is that in the KRaft controller we can
work off a consistent in-memory snapshot of the metadata. This could
be helpful for the case where our operation might take some time (like
sending UMR to all the brokers, or making a lot of partition updates
in ZK). During the time we are processing a record, we don't have to
worry about the metadata changing out from under us.



On Thu, Oct 13, 2022 at 2:44 PM Jun Rao  wrote:
>
> Hi, Colin,
>
> Thanks for the reply.
>
> 10. This is a bit on the implementation side. If you look at the existing
> ZK-based controller, most of the logic is around maintaining an in-memory
> state of all the resources (broker, topic, partition, etc), reading/writing
> to ZK, sending LeaderAndIsr and UpdateMetadata requests and handling the
> responses to brokers. So we need all that logic in the dual write mode. One
> option is to duplicate all that logic in some new code. This can be a bit
> error prone and makes the code a bit harder to maintain if we need to fix
> some critical issues in ZK-based controllers. Another option is to try
> reusing the existing code in the ZK-based controller. For example, we could
> start the EventManager in the ZK-based controller, but let the KRaft
> controller ingest new events. This has its own challenges: (1) the existing
> logic only logs ZK failures and doesn't expose them to the caller; (2) the
> existing logic may add new events to the queue itself and we probably need
> to think through how this is coordinated with the KRaft controller; (3) it
> registers some ZK listeners unnecessarily (may not be a big concern). So we
> need to get around those issues somehow. I am wondering if we have
> considered both options and which approach we are leaning towards for the
> implementation.
>
> 14. Good point and make sense.
>
> Thanks,
>
> Jun
>
>
>
>
> On Wed, Oct 12, 2022 at 3:27 PM Colin McCabe  wrote:
>
> > Hi Jun,
> >
> > Thanks for taking a look. I can answer some questions here because I
> > collaborated on this a bit, and David is on vacation for a few days.
> >
> > On Wed, Oct 12, 2022, at 14:41, Jun Rao wrote:
> > > Hi, David,
> > >
> > > Thanks for the KIP. A few comments below.
> > >
> > > 10. It's still not very clear to me how the KRaft controller works in the
> > > dual writes mode to KRaft log and ZK when the brokers still run in ZK
> > mode.
> > > Does the KRaft controller run a ZK based controller in parallel or do we
> > > derive what needs to be written to ZK based on KRaft controller logic?
> >
> > We derive what needs to be written to ZK based on KRaft controller logic.
> >
> > > I am 

Re: [DISCUSS] (continued) KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-14 Thread Jun Rao
Hi, David,

Thanks for the reply.

60. Hmm, could you explain why KStreams needs the full set of partition
assignments? I am also not sure how this will be implemented based on the
protocol. Since HeartBeatResponse sends the assigned partitions in phases
(those that don't need to wait for revocation from other members first,
followed by the full assignment list), how does a member know which
response has the full assignment?

62. I was referring to Admin#describeConsumerGroups. It seems that
ownedPartitions is still of type List. Also, the
existing topicPartitions() returns Set, not a collection.

63. This is also in Admin#describeConsumerGroups. The comment seems
inconsistent with the field name.
/**
 * The reason reported by the assignor.
 */
byte error;

67. Thanks for the explanation. Make sense. The existing name may be ok.

68.2 Are you saying the error is only intended for the client assignor? But
the coordinator generates the error based on the server side validation,
right? Should we provide some info to tell the client why the validation
fails?

71.1 Hmm, for SubscribedTopicIds, should we use topic name in the
subscription part? That seems more intuitive---a subscription shouldn't
change just because a topic is recreated. For the assigned partitions,
perhaps we could include both topicId and name just like FetchOffsetRequest.

Thanks,

Jun

On Fri, Oct 14, 2022 at 2:49 AM Luke Chen  wrote:

> Thanks for the update.
> Yes, I think using similar way as KIP-868 to fix this issue makes sense.
> Let's consider it in the future.
>
> Luke
>
> On Fri, Oct 14, 2022 at 5:16 PM David Jacot 
> wrote:
>
> > Hi Luke,
> >
> > Thanks for your questions.
> >
> > > 1. We will store the "targetAssignment" into log now. But as we know,
> > there's max batch size limit (default 1MB), which means, we cannot
> support
> > 1M partitions in one group (actually, it should be less than 60k
> partitions
> > since we'll store {topicID+partition id}) by default now. How will we
> > handle that? Do we expect users to adjust the max batch size to support
> > large partitions in groups, which we don't need this change for old
> > protocol?
> >
> > That's right. I have a few ideas to remove this limitation in the
> > future but I decided to keep them for future improvement. The KIP is
> > large enough and as the current protocol suffers from the exact same
> > limitation, it is not a regression.
> >
> > For the future, my thinking is to split the assignment and to only
> > write deltas to the log instead of re-writing all of it. We would need
> > to use transactions for this in the coordinator (similarly to
> > KIP-868). The challenge is that we have to ensure that those deltas
> > are all written or completely roll backed. Otherwise, we would have a
> > weird state with the compaction. This needs obviously more thinking.
> >
> > > I'm wondering why we should persist the "targetAssignment" data? If we
> > want
> > to work for coordinator failover, could the new coordinator try to
> request
> > for currently owned partitions from each consumer when failed over? I'm
> not
> > sure if the consumer will auto send owned partitions to the new
> > coordinator. If not, maybe we can return an error to client
> > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask
> > client to append the currently owned partitions to new coordinates for
> new
> > assignment computation. Does that make sense?
> >
> > The entire reconciliation process depends on it so if we lose it
> > during a failover, members could be in a weird state. For instance,
> > they could be in the middle of a transition from their current
> > assignment to their new target and thus would be blocked. Relying on
> > members to reconstruct it back does not really work because they don't
> > have all the information to do so (e.g. new metadata) so we would have
> > to recompute a new one. This implies that we need to get the owned
> > partitions from all members and that would take a few seconds until
> > all members come back in the best case, up to the session timeout in
> > the worst case. Imagine that a member joins or fails during this time,
> > the whole process would be stuck. I am afraid storing it is the best
> > way here.
> >
> > Best,
> > David
> >
> >
> > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen  wrote:
> > >
> > > Hi David,
> > >
> > > A few more questions:
> > > 1. We will store the "targetAssignment" into log now. But as we know,
> > > there's max batch size limit (default 1MB), which means, we cannot
> > support
> > > 1M partitions in one group (actually, it should be less than 60k
> > partitions
> > > since we'll store {topicID+partition id}) by default now. How will we
> > > handle that? Do we expect users to adjust the max batch size to support
> > > large partitions in groups, which we don't need this change for old
> > > protocol?
> > >
> > > I'm wondering why we should persist the "targetAssignment" data? If we
> > want
> > 

[jira] [Created] (KAFKA-14304) ZooKeeper to KRaft Migration

2022-10-14 Thread David Arthur (Jira)
David Arthur created KAFKA-14304:


 Summary: ZooKeeper to KRaft Migration
 Key: KAFKA-14304
 URL: https://issues.apache.org/jira/browse/KAFKA-14304
 Project: Kafka
  Issue Type: New Feature
Reporter: David Arthur
Assignee: David Arthur
 Fix For: 3.4.0


Top-level JIRA for 
[KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] KIP-875: First-class offsets support in Kafka Connect

2022-10-14 Thread Greg Harris
Hey Chris,

Thanks for your quick reply!

Design 1:

I think this is a completely reasonable way to implement a V1, without any
more complicated pagination.
I had not considered that a REST extension would be able to add pagination,
but upon reflection that makes a lot of sense.

I like the best-effort lexicographic sorting idea, if that's not too
expensive then we should do it.
We also don't necessarily need to have it as a formal contract in the API,
it could just be an implementation detail that can be tweaked later.
Maybe someone wants a ?sort= parameter later, and specifying here that the
base endpoint can return an arbitrary order would allow that lattitude in
implementation later.
Anyone who relies on a particular order can always sort it themselves, I
see the lexicographic sorting as more of a pre-emptive QOL improvement that
may avoid a `jq` step when manually using the API.

Design 2:

This is very good news, and it's one less interoperability concern.
Excellent design choice!

Implementation 1:

Thanks for clarifying, I do wonder if anyone else cares about the
difference. Deleting the group would also serve as a "full cleanup" of the
connector state, where deleting the offsets still leaves the group in the
Kafka broker.

Implementation 2:

This makes a lot of sense, and seems like a good principle to keep moving
forward.
Also thanks for clarifying the difference between "empty set of task
configs" and "tombstoning existing task configs", I think I meant the
former in my initial question.

Overall:

I don't think I have any other concerns with the design.
I did want to say that I really like the new "STOPPED" state, I think
that's almost as-important as the title feature in this KIP, and could even
be in a standalone KIP as a "connector maintenance mode."
It definitely makes sense to add in this KIP, as you need both the
connector config, and the connector to be offline, which previously wasn't
possible inside of Connect.
I also like the unified API for both source and sink, you're fixing the
feature disparity between them and unifying the UX, which is valuable on
it's own.

Thanks so much for the KIP Chris!

Greg


On Fri, Oct 14, 2022 at 7:49 AM Chris Egerton 
wrote:

> Hi Greg,
>
> Thanks for your thoughts.
>
> RE your design questions:
>
> 1. The responses in the REST API may grow fairly large for sink connectors
> that consume from a large number of Kafka topic partitions, and source
> connectors that store a wide range of source partitions. If there is a
> large amount of data for a single source partition for a source connector,
> we will only show the latest committed offset (that the worker has read
> from the offsets topic) for that source partition. We could consider adding
> some kind of pagination API to put a default upper bound on the size of
> responses and allow (or really, require) users to issue multiple requests
> in order to get a complete view of the offsets of connectors with a large
> number of partitions. My initial instinct is to hold off on introducing
> this complexity, though. I can imagine some of the responses getting a
> little ugly to view raw in a terminal or a browser, but I'm not sure we'll
> see more significant issues than that. If we receive feedback that this is
> a serious-enough issue, then we can revisit it in a follow-up KIP for
> offsets V2. In the meantime, it's always possible to implement this
> behavior via a REST extension if it's a blocker for some users. Perhaps to
> make that alternative slightly easier to implement, we can add a contract
> to the API that responses will always be sorted lexicographically by source
> partition for source connectors or by Kafka topic (subsorting by partition)
> for sink connectors. What are your thoughts? If this strategy makes sense I
> can add it to the future work section.
>
> 2. The new STOPPED state should "just work" with the rebalancing algorithm,
> since it'll be implemented under the hood by publishing an empty set of
> task configs to the config topic for the connector. That should be enough
> to trigger a rebalance and to cause no tasks for the connector to be
> allocated across the cluster during that rebalance, regardless of the
> protocol (eager or incremental) that's in use.
>
> RE your implementation questions:
>
> 1. It's mostly a matter of convenience; we can issue a single admin request
> to delete the group rather than having to identify every topic partition
> the consumer group has committed offsets for and then issue a follow-up
> request to delete the offsets for that group. I made note of this detail in
> the KIP to make sure that we were comfortable with completely removing the
> consumer group instead of wiping its offsets, since it seems possible that
> some users may find that behavior unexpected.
>
> 2. The idea is to only perform zombie fencing when we know that it is
> necessary (which is a principle borrowed from KIP-618), so in this case,
> we'll only do it in response to an 

Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-14 Thread Colin McCabe
On Thu, Oct 13, 2022, at 11:44, Jun Rao wrote:
> Hi, Colin,
>
> Thanks for the reply.
>
> 10. This is a bit on the implementation side. If you look at the existing
> ZK-based controller, most of the logic is around maintaining an in-memory
> state of all the resources (broker, topic, partition, etc), reading/writing
> to ZK, sending LeaderAndIsr and UpdateMetadata requests and handling the
> responses to brokers. So we need all that logic in the dual write mode. One
> option is to duplicate all that logic in some new code. This can be a bit
> error prone and makes the code a bit harder to maintain if we need to fix
> some critical issues in ZK-based controllers. Another option is to try
> reusing the existing code in the ZK-based controller. For example, we could
> start the EventManager in the ZK-based controller, but let the KRaft
> controller ingest new events. This has its own challenges: (1) the existing
> logic only logs ZK failures and doesn't expose them to the caller; (2) the
> existing logic may add new events to the queue itself and we probably need
> to think through how this is coordinated with the KRaft controller; (3) it
> registers some ZK listeners unnecessarily (may not be a big concern). So we
> need to get around those issues somehow. I am wondering if we have
> considered both options and which approach we are leaning towards for the
> implementation.
>

Yes, this is a good question. My take is that a big part of the complexity in 
the old controller code results from the fact that we use ZK as a multi-writer 
database for propagating information between different components. So in the 
old controller, every write to ZK needs to be structured as a compare and swap 
to be fully correct. Every time we get notified about something, it's usually 
in the form of "this znode changed" which prompts a full reload of part of the 
data in ZK (which itself has multiple parts, loading, deserializing, 
reconciling, etc.) That all goes away in the new mode, and we just have some 
code which analyzes __cluster_metadata and reflects it in 1) updates to ZK and 
2) messages sent out to brokers.

This is pretty decoupled from the other logic in QuorumController and should be 
easy to unit test, since the same inputs from the log always produce the same 
output in ZK. Basically, ZK is write-only for us, we do not read it (with the 
big exception of broker registration znodes) and I think that will greatly 
simplify things.

So I think dual-write mode as described here will be substantially simpler than 
trying to run part or all of the old controller in parallel. I do think we will 
reuse a bunch of the serialization / deserialization code for znodes and 
possibly the code for communicating with ZK.

best,
Colin


>
> 14. Good point and make sense.
>
> Thanks,
>
> Jun
>
>
>
>
> On Wed, Oct 12, 2022 at 3:27 PM Colin McCabe  wrote:
>
>> Hi Jun,
>>
>> Thanks for taking a look. I can answer some questions here because I
>> collaborated on this a bit, and David is on vacation for a few days.
>>
>> On Wed, Oct 12, 2022, at 14:41, Jun Rao wrote:
>> > Hi, David,
>> >
>> > Thanks for the KIP. A few comments below.
>> >
>> > 10. It's still not very clear to me how the KRaft controller works in the
>> > dual writes mode to KRaft log and ZK when the brokers still run in ZK
>> mode.
>> > Does the KRaft controller run a ZK based controller in parallel or do we
>> > derive what needs to be written to ZK based on KRaft controller logic?
>>
>> We derive what needs to be written to ZK based on KRaft controller logic.
>>
>> > I am also not sure how the KRaft controller handles broker
>> > registration/deregistration, since brokers are still running in ZK mode
>> and
>> > are not heartbeating to the KRaft controller.
>>
>> The new controller will listen for broker registrations under /brokers.
>> This is the only znode watch that the new controller will do.
>>
>> We did consider changing how ZK-based broker registration worked, but it
>> just ended up being too much work for not enough gain.
>>
>> >
>> > 12. "A new set of nodes will be provisioned to host the controller
>> quorum."
>> > I guess we don't support starting the KRaft controller quorum on existing
>> > brokers. It would be useful to make that clear.
>> >
>>
>> Agreed
>>
>> > 13. "Once the quorum is established and a leader is elected, the
>> controller
>> > will check the state of the cluster using the MigrationCheck RPC." How
>> does
>> > the quorum controller detect other brokers? Does the controller node need
>> > to be configured with ZK connection string? If so, it would be useful to
>> > document the additional configs that the quorum controller needs to set.
>> >
>>
>> Yes, the controllers monitor ZK for broker registrations, as I mentioned
>> above. So they need zk.connect and the other ZK connection configurations.
>>
>> > 14. "In order to prevent further writes to ZK, the first thing the new
>> > KRaft quorum must do is take over leadership of the ZK 

Re: [VOTE] KIP-876: Time based cluster metadata snapshots

2022-10-14 Thread Colin McCabe
Thanks, José! +1 (binding)

Colin


On Fri, Oct 14, 2022, at 07:54, David Arthur wrote:
> Thanks for the KIP, José!
>
> +1 (binding)
>
> -David
>
> On Fri, Oct 14, 2022 at 2:48 AM David Jacot  wrote:
>>
>> +1 (binding)
>>
>> Thanks for the KIP!
>>
>> Le ven. 14 oct. 2022 à 05:47, deng ziming  a
>> écrit :
>>
>> > Thanks for this KIP,
>> >
>> > +1 for this(binding).
>> >
>> > --
>> > Best,
>> > Ziming
>> >
>> > > On Oct 14, 2022, at 8:11 AM, José Armando García Sancio
>> >  wrote:
>> > >
>> > > Hello all,
>> > >
>> > > I would like to start voting for "KIP-876: Time based cluster metadata
>> > > snapshots."
>> > >
>> > > KIP: https://cwiki.apache.org/confluence/x/MY3GDQ
>> > > Discussion thread:
>> > > https://lists.apache.org/thread/ww67h9d4xvgw1f7jn4zxwydmt8x1mq72
>> > >
>> > > Thanks!
>> > > --
>> > > -José
>> >
>> >
>
>
>
> -- 
> David Arthur


Re: [ANNOUNCE] New committer: Deng Ziming

2022-10-14 Thread David Arthur
Fantastic! Congratulations, Ziming!

-David

On Tue, Oct 11, 2022 at 12:51 PM José Armando García Sancio
 wrote:
>
> Congratulations Ziming. Well deserved and I much appreciate your
> contributions to the project.
>
> --
> -José



-- 
David Arthur


Re: [DISCUSS] KIP-875: First-class offsets support in Kafka Connect

2022-10-14 Thread Chris Egerton
Hi Ashwin,

Thanks for your thoughts. Regarding your questions:

1. The response would show the offsets that are visible to the source
connector, so it would combine the contents of the two topics, giving
priority to offsets present in the connector-specific topic. I'm imagining
a follow-up question that some people may have in response to that is
whether we'd want to provide insight into the contents of a single topic at
a time. It may be useful to be able to see this information in order to
debug connector issues or verify that it's safe to stop using a
connector-specific offsets topic (either explicitly, or implicitly via
cluster downgrade). What do you think about adding a URL query parameter
that allows users to dictate which view of the connector's offsets they are
given in the REST response, with options for the worker's global topic, the
connector-specific topic, and the combined view of them that the connector
and its tasks see (which would be the default)? This may be too much for V1
but it feels like it's at least worth exploring a bit.

2. There is no option for this at the moment. Reset semantics are extremely
coarse-grained; for source connectors, we delete all source offsets, and
for sink connectors, we delete the entire consumer group. I'm hoping this
will be enough for V1 and that, if there's sufficient demand for it, we can
introduce a richer API for resetting or even modifying connector offsets in
a follow-up KIP.

3. Good eye :) I think it's fine to keep the existing behavior for the
PAUSED state with the Connector instance, since the primary purpose of the
Connector is to generate task configs and monitor the external system for
changes. If there's no chance for tasks to be running anyways, I don't see
much value in allowing paused connectors to generate new task configs,
especially since each time that happens a rebalance is triggered and
there's a non-zero cost to that. What do you think?

Cheers,

Chris

On Fri, Oct 14, 2022 at 12:59 AM Ashwin 
wrote:

> Thanks for KIP Chris - I think this is a useful feature.
>
> Can you please elaborate on the following in the KIP -
>
> 1. How would the response of GET /connectors/{connector}/offsets look like
> if the worker has both global and connector specific offsets topic ?
>
> 2. How can we pass the reset options like shift-by , to-date-time etc.
> using a REST API like DELETE /connectors/{connector}/offsets ?
>
> 3. Today PAUSE operation on a connector invokes its stop method - will
> there be a change here to reduce confusion with the new proposed STOPPED
> state ?
>
> Thanks,
> Ashwin
>
> On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton 
> wrote:
>
> > Hi all,
> >
> > I noticed a fairly large gap in the first version of this KIP that I
> > published last Friday, which has to do with accommodating connectors
> > that target different Kafka clusters than the one that the Kafka Connect
> > cluster uses for its internal topics and source connectors with dedicated
> > offsets topics. I've since updated the KIP to address this gap, which has
> > substantially altered the design. Wanted to give a heads-up to anyone
> > that's already started reviewing.
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to begin discussion on a KIP to add offsets support to the
> Kafka
> > > Connect REST API:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> >
>


Re: [VOTE] KIP-876: Time based cluster metadata snapshots

2022-10-14 Thread David Arthur
Thanks for the KIP, José!

+1 (binding)

-David

On Fri, Oct 14, 2022 at 2:48 AM David Jacot  wrote:
>
> +1 (binding)
>
> Thanks for the KIP!
>
> Le ven. 14 oct. 2022 à 05:47, deng ziming  a
> écrit :
>
> > Thanks for this KIP,
> >
> > +1 for this(binding).
> >
> > --
> > Best,
> > Ziming
> >
> > > On Oct 14, 2022, at 8:11 AM, José Armando García Sancio
> >  wrote:
> > >
> > > Hello all,
> > >
> > > I would like to start voting for "KIP-876: Time based cluster metadata
> > > snapshots."
> > >
> > > KIP: https://cwiki.apache.org/confluence/x/MY3GDQ
> > > Discussion thread:
> > > https://lists.apache.org/thread/ww67h9d4xvgw1f7jn4zxwydmt8x1mq72
> > >
> > > Thanks!
> > > --
> > > -José
> >
> >



-- 
David Arthur


Re: Re: [DISCUSS] KIP-875: First-class offsets support in Kafka Connect

2022-10-14 Thread Chris Egerton
Hi Greg,

Thanks for your thoughts.

RE your design questions:

1. The responses in the REST API may grow fairly large for sink connectors
that consume from a large number of Kafka topic partitions, and source
connectors that store a wide range of source partitions. If there is a
large amount of data for a single source partition for a source connector,
we will only show the latest committed offset (that the worker has read
from the offsets topic) for that source partition. We could consider adding
some kind of pagination API to put a default upper bound on the size of
responses and allow (or really, require) users to issue multiple requests
in order to get a complete view of the offsets of connectors with a large
number of partitions. My initial instinct is to hold off on introducing
this complexity, though. I can imagine some of the responses getting a
little ugly to view raw in a terminal or a browser, but I'm not sure we'll
see more significant issues than that. If we receive feedback that this is
a serious-enough issue, then we can revisit it in a follow-up KIP for
offsets V2. In the meantime, it's always possible to implement this
behavior via a REST extension if it's a blocker for some users. Perhaps to
make that alternative slightly easier to implement, we can add a contract
to the API that responses will always be sorted lexicographically by source
partition for source connectors or by Kafka topic (subsorting by partition)
for sink connectors. What are your thoughts? If this strategy makes sense I
can add it to the future work section.

2. The new STOPPED state should "just work" with the rebalancing algorithm,
since it'll be implemented under the hood by publishing an empty set of
task configs to the config topic for the connector. That should be enough
to trigger a rebalance and to cause no tasks for the connector to be
allocated across the cluster during that rebalance, regardless of the
protocol (eager or incremental) that's in use.

RE your implementation questions:

1. It's mostly a matter of convenience; we can issue a single admin request
to delete the group rather than having to identify every topic partition
the consumer group has committed offsets for and then issue a follow-up
request to delete the offsets for that group. I made note of this detail in
the KIP to make sure that we were comfortable with completely removing the
consumer group instead of wiping its offsets, since it seems possible that
some users may find that behavior unexpected.

2. The idea is to only perform zombie fencing when we know that it is
necessary (which is a principle borrowed from KIP-618), so in this case,
we'll only do it in response to an offsets reset request, and not when a
connector is stopped. After being stopped, it's possible that the connector
gets deleted, in which case, a proactive round of fencing would have served
no benefit. It's also worth noting that publishing an empty set of task
configs is not the same as tombstoning existing task configs; putting a
connector into the STOPPED state should require no tombstones to be emitted
to the config topic.

Cheers,

Chris

On Thu, Oct 13, 2022 at 6:26 PM Greg Harris 
wrote:

> Hey Chris,
>
> Thanks for the KIP!
>
> I think this is an important feature for both development and operations
> use-cases, and it's an obvious gap in the REST feature set.
> I also appreciate the incremental nature of the KIP and the future
> extensions that will now be possible.
>
> I had a couple of questions about the design and it's extensibility:
>
> 1. How do you imagine the API will behave with connectors that have
> extremely large numbers of partitions (thousands or more) and/or source
> connectors with large amounts of data per partition?
>
> 2. Does the new STOPPED state need any special integration with the
> rebalance subsystem, or can the rebalance algorithms remain ignorant of the
> target state of connectors?
>
> And about the implementation:
>
> 1. For my own edification, what is the difference between deleting a
> consumer group and deleting all known offsets for that group? Does deleting
> the group offer better/easier atomicity?
>
> 2. For EOS sources, will stopping the connector and tombstoning the task
> configs perform a fence-out, or will that fence-out only occur when
> performing the offsets DELETE operation?
>
> Thanks!
> Greg
>
> On 2022/10/13 20:52:26 Chris Egerton wrote:
> > Hi all,
> >
> > I noticed a fairly large gap in the first version of this KIP that I
> > published last Friday, which has to do with accommodating connectors
> > that target different Kafka clusters than the one that the Kafka Connect
> > cluster uses for its internal topics and source connectors with dedicated
> > offsets topics. I've since updated the KIP to address this gap, which has
> > substantially altered the design. Wanted to give a heads-up to anyone
> > that's already started reviewing.
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Oct 7, 2022 at 1:29 PM Chris 

PR review request

2022-10-14 Thread Shekhar Prasad Rajak
Hi team,
Please review the PRs:
* https://github.com/apache/kafka/pull/12735  Replace EasyMock and PowerMock 
with Mockito in connect/runtime/ErrorHandlingTaskTest 

* https://github.com/apache/kafka/pull/12739 Replace EasyMock and PowerMock 
with Mockito | TimeOrderedCachingPersistentWindowStoreTest

I see there are few irrelevant testcases failing, but I think those are fine. 


Thanks and regards,Shekhar Prasad Rajak,
Contact : +918142478937Blog | Github | TwitterSkype: shekhar.rajak1



[jira] [Created] (KAFKA-14303) Producer.send without record key and batch.size=0 goes into infinite loop

2022-10-14 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-14303:
---

 Summary: Producer.send without record key and batch.size=0 goes 
into infinite loop
 Key: KAFKA-14303
 URL: https://issues.apache.org/jira/browse/KAFKA-14303
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 3.3.1, 3.3.0
Reporter: Igor Soarez


3.3 has broken previous producer behavior.

A call to {{producer.send(record)}} with a record without a key and configured 
with {{batch.size=0}} never returns.

Reproducer:
{code:java}
class ProducerIssueTest extends IntegrationTestHarness {
  override protected def brokerCount = 1
  @Test
  def testProduceWithBatchSizeZeroAndNoRecordKey(): Unit = {
val topicName = "foo"
createTopic(topicName)
val overrides = new Properties
overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 0)
val producer = createProducer(keySerializer = new StringSerializer, 
valueSerializer = new StringSerializer, overrides)
val record = new ProducerRecord[String, String](topicName, null, "hello")
val future = producer.send(record) // goes into infinite loop here
future.get(10, TimeUnit.SECONDS)
  }
} {code}
 

[Documentation for producer 
configuration|https://kafka.apache.org/documentation/#producerconfigs_batch.size]
 states {{batch.size=0}} as a valid value:
{quote}Valid Values: [0,...]
{quote}
and recommends its use directly:
{quote}A small batch size will make batching less common and may reduce 
throughput (a batch size of zero will disable batching entirely).
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14302) Infinite probing rebalance if a changelog topic got emptied

2022-10-14 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-14302:


 Summary: Infinite probing rebalance if a changelog topic got 
emptied
 Key: KAFKA-14302
 URL: https://issues.apache.org/jira/browse/KAFKA-14302
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Damien Gasparina
 Attachments: image-2022-10-14-12-04-01-190.png

If a store, with a changelog topic, has been fully emptied, it could generate 
infinite probing rebalance.

 

The scenario is the following:
 * A Kafka Streams application have a store with a changelog
 * Many entries are pushed into the changelog, thus the Log end Offset is high, 
let's say 20,000
 * Then, the store got emptied, either due to data retention (windowing) or 
tombstone
 * Then an instance of the application is restarted
 * It restores the store from the changelog, but does not write a checkpoint 
file as there are no data pushed at all
 * As there are no checkpoint entries, this instance specify a taskOffsetSums 
with offset set to 0 in the subscriptionUserData
 * The group leader, during the assignment, then compute a lag of 20,000 (end 
offsets - task offset), which is greater than the default acceptable lag, thus 
decide to schedule a probing rebalance
 * In ther next probing rebalance, nothing changed, so... new probing rebalance

 

I was able to reproduce locally with a simple topology:

 
{code:java}
var table = streamsBuilder.stream("table");
streamsBuilder
.stream("stream")
.join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
JoinWindows.of(Duration.ofSeconds(5)))
.to("output");{code}
 

 

 

Due to this issue, application having an empty changelog are experiencing 
frequent rebalance:

!image-2022-10-14-12-04-01-190.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-14 Thread Luke Chen
Hi David,

I made a final pass and LGTM now.
+1 from me.

Luke

On Wed, Oct 5, 2022 at 12:32 AM Guozhang Wang  wrote:

> Hello David,
>
> I've made my final pass on the doc and I think it looks good now. +1.
>
>
> Guozhang
>
> On Wed, Sep 14, 2022 at 1:37 PM Guozhang Wang  wrote:
>
> > Thanks David,
> >
> > There are a few minor comments pending in the discussion thread, and one
> > is about whether we should merge PreparePartitionAssignment with HB. But
> I
> > think the KIP itself is in pretty good shape now. Thanks!
> >
> >
> > Guozhang
> >
> > On Fri, Sep 9, 2022 at 1:32 AM David Jacot 
> > wrote:
> >
> >> Hi all,
> >>
> >> Thank you all for the very positive discussion about KIP-848. It looks
> >> like folks are very positive about it overall.
> >>
> >> I would like to start a vote on KIP-848, which introduces a brand new
> >> consumer rebalance protocol.
> >>
> >> The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
> >>
> >> Best,
> >> David
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] (continued) KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-14 Thread Luke Chen
Thanks for the update.
Yes, I think using similar way as KIP-868 to fix this issue makes sense.
Let's consider it in the future.

Luke

On Fri, Oct 14, 2022 at 5:16 PM David Jacot 
wrote:

> Hi Luke,
>
> Thanks for your questions.
>
> > 1. We will store the "targetAssignment" into log now. But as we know,
> there's max batch size limit (default 1MB), which means, we cannot support
> 1M partitions in one group (actually, it should be less than 60k partitions
> since we'll store {topicID+partition id}) by default now. How will we
> handle that? Do we expect users to adjust the max batch size to support
> large partitions in groups, which we don't need this change for old
> protocol?
>
> That's right. I have a few ideas to remove this limitation in the
> future but I decided to keep them for future improvement. The KIP is
> large enough and as the current protocol suffers from the exact same
> limitation, it is not a regression.
>
> For the future, my thinking is to split the assignment and to only
> write deltas to the log instead of re-writing all of it. We would need
> to use transactions for this in the coordinator (similarly to
> KIP-868). The challenge is that we have to ensure that those deltas
> are all written or completely roll backed. Otherwise, we would have a
> weird state with the compaction. This needs obviously more thinking.
>
> > I'm wondering why we should persist the "targetAssignment" data? If we
> want
> to work for coordinator failover, could the new coordinator try to request
> for currently owned partitions from each consumer when failed over? I'm not
> sure if the consumer will auto send owned partitions to the new
> coordinator. If not, maybe we can return an error to client
> ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask
> client to append the currently owned partitions to new coordinates for new
> assignment computation. Does that make sense?
>
> The entire reconciliation process depends on it so if we lose it
> during a failover, members could be in a weird state. For instance,
> they could be in the middle of a transition from their current
> assignment to their new target and thus would be blocked. Relying on
> members to reconstruct it back does not really work because they don't
> have all the information to do so (e.g. new metadata) so we would have
> to recompute a new one. This implies that we need to get the owned
> partitions from all members and that would take a few seconds until
> all members come back in the best case, up to the session timeout in
> the worst case. Imagine that a member joins or fails during this time,
> the whole process would be stuck. I am afraid storing it is the best
> way here.
>
> Best,
> David
>
>
> On Fri, Oct 14, 2022 at 5:11 AM Luke Chen  wrote:
> >
> > Hi David,
> >
> > A few more questions:
> > 1. We will store the "targetAssignment" into log now. But as we know,
> > there's max batch size limit (default 1MB), which means, we cannot
> support
> > 1M partitions in one group (actually, it should be less than 60k
> partitions
> > since we'll store {topicID+partition id}) by default now. How will we
> > handle that? Do we expect users to adjust the max batch size to support
> > large partitions in groups, which we don't need this change for old
> > protocol?
> >
> > I'm wondering why we should persist the "targetAssignment" data? If we
> want
> > to work for coordinator failover, could the new coordinator try to
> request
> > for currently owned partitions from each consumer when failed over? I'm
> not
> > sure if the consumer will auto send owned partitions to the new
> > coordinator. If not, maybe we can return an error to client
> > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask
> > client to append the currently owned partitions to new coordinates for
> new
> > assignment computation. Does that make sense?
> >
> > Luke
> >
> > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao 
> wrote:
> >
> > > Hi, David,
> > >
> > > Thanks for the reply and the updated KIP. A few more comments on the
> > > interfaces and the protocols.
> > >
> > > 60.  On the consumer side, do we need both
> PartitionAssignor.onAssignment
> > > and ConsumerRebalanceListener.onPartitionsAssigned? My understanding is
> > > that the former was added for cooperative rebalance, which is now
> handled
> > > by the coordinator. If we do need both, should we make them more
> consistent
> > > (e.g. topic name vs topic id, list vs set vs collection)?
> > >
> > > 61. group.local.assignors: Could we make it clear that it's the full
> class
> > > name that implements PartitionAssignor?
> > >
> > > 62. MemberAssignment: It currently has the following method.
> > > public Set topicPartitions()
> > > We are adding List ownedPartitions. Should we keep
> the
> > > naming and the return type consistent?
> > >
> > > 63. MemberAssignment.error: should that be reason?
> > >
> > > 64. group.remote.assignor: The client may not know what assignors 

Re: [DISCUSS] (continued) KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-14 Thread David Jacot
Hi Luke,

Thanks for your questions.

> 1. We will store the "targetAssignment" into log now. But as we know,
there's max batch size limit (default 1MB), which means, we cannot support
1M partitions in one group (actually, it should be less than 60k partitions
since we'll store {topicID+partition id}) by default now. How will we
handle that? Do we expect users to adjust the max batch size to support
large partitions in groups, which we don't need this change for old
protocol?

That's right. I have a few ideas to remove this limitation in the
future but I decided to keep them for future improvement. The KIP is
large enough and as the current protocol suffers from the exact same
limitation, it is not a regression.

For the future, my thinking is to split the assignment and to only
write deltas to the log instead of re-writing all of it. We would need
to use transactions for this in the coordinator (similarly to
KIP-868). The challenge is that we have to ensure that those deltas
are all written or completely roll backed. Otherwise, we would have a
weird state with the compaction. This needs obviously more thinking.

> I'm wondering why we should persist the "targetAssignment" data? If we want
to work for coordinator failover, could the new coordinator try to request
for currently owned partitions from each consumer when failed over? I'm not
sure if the consumer will auto send owned partitions to the new
coordinator. If not, maybe we can return an error to client
ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask
client to append the currently owned partitions to new coordinates for new
assignment computation. Does that make sense?

The entire reconciliation process depends on it so if we lose it
during a failover, members could be in a weird state. For instance,
they could be in the middle of a transition from their current
assignment to their new target and thus would be blocked. Relying on
members to reconstruct it back does not really work because they don't
have all the information to do so (e.g. new metadata) so we would have
to recompute a new one. This implies that we need to get the owned
partitions from all members and that would take a few seconds until
all members come back in the best case, up to the session timeout in
the worst case. Imagine that a member joins or fails during this time,
the whole process would be stuck. I am afraid storing it is the best
way here.

Best,
David


On Fri, Oct 14, 2022 at 5:11 AM Luke Chen  wrote:
>
> Hi David,
>
> A few more questions:
> 1. We will store the "targetAssignment" into log now. But as we know,
> there's max batch size limit (default 1MB), which means, we cannot support
> 1M partitions in one group (actually, it should be less than 60k partitions
> since we'll store {topicID+partition id}) by default now. How will we
> handle that? Do we expect users to adjust the max batch size to support
> large partitions in groups, which we don't need this change for old
> protocol?
>
> I'm wondering why we should persist the "targetAssignment" data? If we want
> to work for coordinator failover, could the new coordinator try to request
> for currently owned partitions from each consumer when failed over? I'm not
> sure if the consumer will auto send owned partitions to the new
> coordinator. If not, maybe we can return an error to client
> ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask
> client to append the currently owned partitions to new coordinates for new
> assignment computation. Does that make sense?
>
> Luke
>
> On Fri, Oct 14, 2022 at 12:22 AM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Thanks for the reply and the updated KIP. A few more comments on the
> > interfaces and the protocols.
> >
> > 60.  On the consumer side, do we need both PartitionAssignor.onAssignment
> > and ConsumerRebalanceListener.onPartitionsAssigned? My understanding is
> > that the former was added for cooperative rebalance, which is now handled
> > by the coordinator. If we do need both, should we make them more consistent
> > (e.g. topic name vs topic id, list vs set vs collection)?
> >
> > 61. group.local.assignors: Could we make it clear that it's the full class
> > name that implements PartitionAssignor?
> >
> > 62. MemberAssignment: It currently has the following method.
> > public Set topicPartitions()
> > We are adding List ownedPartitions. Should we keep the
> > naming and the return type consistent?
> >
> > 63. MemberAssignment.error: should that be reason?
> >
> > 64. group.remote.assignor: The client may not know what assignors the
> > broker supports. Should we default this to what the broker determines (e.g.
> > first assignor listed in group.consumer.assignors)?
> >
> > 65. After the text "When A heartbeats again and acknowledges the
> > revocation, the group coordinator transitions him to epoch 2 and releases
> > foo-2.", we have the following.
> >   B - epoch=2, partitions=[foo-2], pending-partitions=[]
> > Should 

Re: [DISCUSS] (continued) KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-14 Thread David Jacot
Hi Jun,

Thanks for your comments. Please find my answers below.

60. Yes, we need both. PartitionAssignor.onAssignment is here to
inform the customer assignor about the assignment decision taken with
the full set of assigned partitions regardless of whether they are
already revoked or not and the custom metadata. Streams needs this in
order to act on the new assignment. It will be called only once per
epoch. I do agree with the consistency point. I already removed topic
ids from the PartitionAssignor interface and I have switched to using
Collection instead of List.

61. Done.

62. Make sense. I have standardized on `Collection
targetPartitions`.

63. `error` is correct here. A member passes a `reason` to the
assignor where the `reason` represents the reason of the assignment
trigger. The assignor passes an `error` to the member where the
`error` represents why the assignment failed. Note that it seems that
you looked at a slightly outdated version of the KIP because
`MemberAssignment.error` is no longer there. I have chosen to pass
`error` to `onAssignment` directly because it allows us to better
reuse other classes. Let me know what you think.

64. I like the idea. Let me change `group.remote.assignor` to default
to null and this would signal that the default on the coordinator side
should be used. The default would be the first one in the list.

65. I think that both are actually valid here. It mainly depends on
whether we want to move foo-2 when A revokes it and when B heartbeats
next. Let me follow your suggestion as it seems a little clearer in
the example.

66. It is actually correct. It is important to transition C to epoch
23 at this stage because the new epoch must be given in the JoinGroup
response. The SyncGroup will just collect the assignment. I have added
a note to explain this in the example.

67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs is defined based
on max.poll.interval.ms. It is used in the revocation process where
the coordinator gives the rebalance timeout to the member to revoke
partitions. It is also used to put an upper bound on the client-side
assignment computation. I do agree that the name is not so good now. I
wonder if we should just use MaxPollIntervalMs now. What do you think?

68.
68.1 Yes.
68.2 I don't think that we need it at the moment. The error here is a
client-side assignor implementation details. It is supposed to mean
something in this context so storing it on the server side seems
wasteful to me.

69. There is ConsumerGroupPrepareAssignmentResponse.AssignorName. All
the ConsumerGroupPrepareAssignmentResponse.Members.Assignor are of
this type.

70. ConsumerGroupInstallAssignmentRequest.GroupEpoch is supposed to be
set based on ConsumerGroupPrepareAssignmentResponse.GroupEpoch. I have
added this for fencing purposes. It will allow the group coordinator
to reject an installation if the group epoch does not match the
expected one. I do agree that the client should not do anything but to
copy the value here.

71.
71.1 Hum.. I am not sure about this one. I tend to agree that it would
make the life of the tools simpler. However, there is also
`SubscribedTopicIds`. Should we add the names for all of them? Or
perhaps, provide a mapping from ID to name in the response. The
mapping may be the best option here in order to not repeat names
everywhere. What do you think?
71.2 Totally

72. It should be optional. Members are expected to set it and the
coordinator will use them to fence them if the epoch is not correct.
Tools can omit it. We have a similar behavior for the
OffsetCommitRequest so in a sense we make the OffsetFetchRequest
consistent with its counterpart.

Best,
David

On Thu, Oct 13, 2022 at 6:21 PM Jun Rao  wrote:
>
> Hi, David,
>
> Thanks for the reply and the updated KIP. A few more comments on the
> interfaces and the protocols.
>
> 60.  On the consumer side, do we need both PartitionAssignor.onAssignment
> and ConsumerRebalanceListener.onPartitionsAssigned? My understanding is
> that the former was added for cooperative rebalance, which is now handled
> by the coordinator. If we do need both, should we make them more consistent
> (e.g. topic name vs topic id, list vs set vs collection)?
>
> 61. group.local.assignors: Could we make it clear that it's the full class
> name that implements PartitionAssignor?
>
> 62. MemberAssignment: It currently has the following method.
> public Set topicPartitions()
> We are adding List ownedPartitions. Should we keep the
> naming and the return type consistent?
>
> 63. MemberAssignment.error: should that be reason?
>
> 64. group.remote.assignor: The client may not know what assignors the
> broker supports. Should we default this to what the broker determines (e.g.
> first assignor listed in group.consumer.assignors)?
>
> 65. After the text "When A heartbeats again and acknowledges the
> revocation, the group coordinator transitions him to epoch 2 and releases
> foo-2.", we have the following.
>   B - epoch=2, 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.3 #104

2022-10-14 Thread Apache Jenkins Server
See 




Jenkins build became unstable: Kafka » Kafka Branch Builder » trunk #1296

2022-10-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-876: Time based cluster metadata snapshots

2022-10-14 Thread Luke Chen
Hi José,

Thanks for the KIP.
Adding support to generate snapshot based on time makes sense to me.

The only thing I'd like to point out is the compatibility section.
Since this new config is default to 1 hour, which means if users explicitly
set the config `metadata.log.max.record.bytes.between.snapshots` to a very
large value to avoid snapshot creation, after upgraded, the snapshots will
be created every hour. I think this behavior change should be explicitly
written in compatibility section. WDYT?

Otherwise, LGTM.

Luke

On Fri, Oct 14, 2022 at 8:14 AM José Armando García Sancio
 wrote:

> Thanks for your feedback David Jacot, Colin McCabe and Niket Goel.
>
> I started the vote thread at
> https://lists.apache.org/thread/yzzhbvdqxg9shttgbzooc2f42l1cv2sj
>
> --
> -José
>


Re: [VOTE] KIP-876: Time based cluster metadata snapshots

2022-10-14 Thread David Jacot
+1 (binding)

Thanks for the KIP!

Le ven. 14 oct. 2022 à 05:47, deng ziming  a
écrit :

> Thanks for this KIP,
>
> +1 for this(binding).
>
> --
> Best,
> Ziming
>
> > On Oct 14, 2022, at 8:11 AM, José Armando García Sancio
>  wrote:
> >
> > Hello all,
> >
> > I would like to start voting for "KIP-876: Time based cluster metadata
> > snapshots."
> >
> > KIP: https://cwiki.apache.org/confluence/x/MY3GDQ
> > Discussion thread:
> > https://lists.apache.org/thread/ww67h9d4xvgw1f7jn4zxwydmt8x1mq72
> >
> > Thanks!
> > --
> > -José
>
>