Re: KAFKA-16221

2024-05-02 Thread matthias . kraaz
Hi Sophie,

Thank you very much. Will do a PR. Please allow some extra time, as I'd like to 
test the change first. Not going overboard. I have to test the real thing 
anyway.

Kind regards, Matthias Kraaz

> Hey Matthias -- I'm quite sure you should *not* do what we're doing in
> https://github.com/apache/kafka/pull/15315. That's definitely a bad hack,
> and IIUC the only reason we accepted it was because the choice was between
> implementing a hacky temporary fix and blocking the entire release while we
> figured out the "right" way to fix this. Sadly, it seems like we never
> followed up after the release was cut, and AFAICT everyone forgot about
> this so-called "temporary" hack as predicted
> ...
> 
> Anyways, as a general rule, you should never try to catch and swallow an
> IllegalStateException. You never know if you might be swallowing an actual
> problem and messing up your application. It's better to just make sure it
> doesn't get thrown in the first place.
> 
> In this specific case, it seems like the "illegal state" that both you and
> Kafka Streams are hitting comes from calling #abortTransaction after a
> timeout. So it sounds to me like you should just add a separate catch block
> for TimeoutException and retry the #commitTransaction.
> 
> For the full explanation as to why you need to retry the #commitTxn (as
> opposed to just starting a new transaction), check out this paragraph in
> the #commitTransaction javadocs:
> 
> Note that this method will raise TimeoutException if the transaction cannot
> > be committed before expiration of max.block.ms, but this does not mean
> > the request did not actually reach the broker. In fact, it only indicates
> > that we cannot get the acknowledgement response in time, so it's up to the
> > application's logic to decide how to handle timeouts. Additionally, it will
> > raise InterruptException if interrupted. It is safe to retry in either
> > case, but it is not possible to attempt a different operation (such as
> > abortTransaction) since the commit may already be in the progress of
> > completing. If not retrying, the only option is to close the producer.
> 
> 
> It does seem like the exception handling example is objectively incorrect,
> since TimeoutException extends KafkaException and will, as you experienced,
> end up in the KafkaException catch block that tries to abort the exception.
> I get that people might want to handle timeouts differently, and it's
> tricky since they can also be thrown from abortTransaction and has to be
> handled in the same way (ie you can only retry the same API or close the
> producer). But the example is very misleading and should absolutely be
> updated to include explicit TimeoutException handling in my opinion.
> 
> Would you be interested in doing a PR to fix the Producer javadcos and
> improve the exception handling example?
> 
> On Thu, May 2, 2024 at 4:22 AM  wrote:
> 
> > Hi,
> >
> > Thanks for your work and sorry to bother you.
> >
> > My code gets the same IllegalStateException from KafkaProducer as Kafka
> > Stream gets in KAFKA-16221:
> >
> > java.lang.IllegalStateException: Cannot attempt operation
> > `abortTransaction` because the previous call to `commitTransaction` timed
> > out and must be retried
> > at
> > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1109)
> > at
> > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
> >
> > I have followed the recipe from
> > https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html[https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
> > for error handling for a transactional producer, i.e.
> >
> > try {
> > producer.beginTransaction();
> > for (int i = 0; i < 100; i++)
> > producer.send(new ProducerRecord<>("my-topic",
> > Integer.toString(i), Integer.toString(i)));
> > producer.commitTransaction();
> > } catch (ProducerFencedException | OutOfOrderSequenceException |
> > AuthorizationException e) {
> > // We can't recover from these exceptions, so our only option is to
> > close the producer and exit.
> > producer.close();
> > } catch (KafkaException e) {
> > // For all other exceptions, just abort the transaction and try again.
> > producer.abortTransaction();
> > }
> > producer.close();
> >
> > Kafka Streams has solved KAFKA-16221 by introducing a hack (
> > https://github.com/apache/kafka/pull/15315[https://github.com/apache/kafka/pull/15315]),
> >  but plans a clean solution.
> >
> > Does that mean that above recipe is outdated?
> > Is there really no simple, clean solution how to do the error handling?
> > Should I use the 

Re: [DISCUSS] KIP-1018: Introduce max remote fetch timeout config

2024-05-02 Thread Kamal Chandraprakash
Christo,

We have localTimeMs, remoteTimeMs, and totalTimeMs as part of the
FetchConsumer request metric.

kafka.network:type=RequestMetrics,name={LocalTimeMs|RemoteTimeMs|TotalTimeMs},request={Produce|FetchConsumer|FetchFollower}

RemoteTimeMs refers to the amount of time spent in the purgatory for normal
fetch requests
and amount of time spent in reading the remote data for remote-fetch
requests. Do we want
to have a separate `TieredStorageTimeMs` to capture the time spent in
remote-read requests?

With per-broker level timer metrics combined with the request level
metrics, the user will have
sufficient information.

Metric name =
kafka.log.remote:type=RemoteLogManager,name=RemoteLogReaderFetchRateAndTimeMs

--
Kamal

On Mon, Apr 29, 2024 at 1:38 PM Christo Lolov 
wrote:

> Heya!
>
> Is it difficult to instead add the metric at
> kafka.network:type=RequestMetrics,name=TieredStorageMs (or some other
> name=*)? Alternatively, if it is difficult to add it there, is it possible
> to add 2 metrics, one at the RequestMetrics level (even if it is
> total-time-ms - (all other times)) and one at what you are proposing? As an
> operator I would find it strange to not see the metric in the
> RequestMetrics.
>
> Your thoughts?
>
> Best,
> Christo
>
> On Sun, 28 Apr 2024 at 10:52, Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Christo,
> >
> > Updated the KIP with the remote fetch latency metric. Please take another
> > look!
> >
> > --
> > Kamal
> >
> > On Sun, Apr 28, 2024 at 12:23 PM Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> > > Hi Federico,
> > >
> > > Thanks for the suggestion! Updated the config name to "
> > > remote.fetch.max.wait.ms".
> > >
> > > Christo,
> > >
> > > Good point. We don't have the remote-read latency metrics to measure
> the
> > > performance of the remote read requests. I'll update the KIP to emit
> this
> > > metric.
> > >
> > > --
> > > Kamal
> > >
> > >
> > > On Sat, Apr 27, 2024 at 4:03 PM Federico Valeri 
> > > wrote:
> > >
> > >> Hi Kamal, it looks like all TS configurations starts with "remote."
> > >> prefix, so I was wondering if we should name it
> > >> "remote.fetch.max.wait.ms".
> > >>
> > >> On Fri, Apr 26, 2024 at 7:07 PM Kamal Chandraprakash
> > >>  wrote:
> > >> >
> > >> > Hi all,
> > >> >
> > >> > If there are no more comments, I'll start a vote thread by tomorrow.
> > >> > Please review the KIP.
> > >> >
> > >> > Thanks,
> > >> > Kamal
> > >> >
> > >> > On Sat, Mar 30, 2024 at 11:08 PM Kamal Chandraprakash <
> > >> > kamal.chandraprak...@gmail.com> wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > > Bumping the thread. Please review this KIP. Thanks!
> > >> > >
> > >> > > On Thu, Feb 1, 2024 at 9:11 PM Kamal Chandraprakash <
> > >> > > kamal.chandraprak...@gmail.com> wrote:
> > >> > >
> > >> > >> Hi Jorge,
> > >> > >>
> > >> > >> Thanks for the review! Added your suggestions to the KIP. PTAL.
> > >> > >>
> > >> > >> The `fetch.max.wait.ms` config will be also applicable for
> topics
> > >> > >> enabled with remote storage.
> > >> > >> Updated the description to:
> > >> > >>
> > >> > >> ```
> > >> > >> The maximum amount of time the server will block before answering
> > the
> > >> > >> fetch request
> > >> > >> when it is reading near to the tail of the partition
> > >> (high-watermark) and
> > >> > >> there isn't
> > >> > >> sufficient data to immediately satisfy the requirement given by
> > >> > >> fetch.min.bytes.
> > >> > >> ```
> > >> > >>
> > >> > >> --
> > >> > >> Kamal
> > >> > >>
> > >> > >> On Thu, Feb 1, 2024 at 12:12 AM Jorge Esteban Quilcate Otoya <
> > >> > >> quilcate.jo...@gmail.com> wrote:
> > >> > >>
> > >> > >>> Hi Kamal,
> > >> > >>>
> > >> > >>> Thanks for this KIP! It should help to solve one of the main
> > issues
> > >> with
> > >> > >>> tiered storage at the moment that is dealing with individual
> > >> consumer
> > >> > >>> configurations to avoid flooding logs with interrupted
> exceptions.
> > >> > >>>
> > >> > >>> One of the topics discussed in [1][2] was on the semantics of `
> > >> > >>> fetch.max.wait.ms` and how it's affected by remote storage.
> > Should
> > >> we
> > >> > >>> consider within this KIP the update of `fetch.max.wail.ms` docs
> > to
> > >> > >>> clarify
> > >> > >>> it only applies to local storage?
> > >> > >>>
> > >> > >>> Otherwise, LGTM -- looking forward to see this KIP adopted.
> > >> > >>>
> > >> > >>> [1] https://issues.apache.org/jira/browse/KAFKA-15776
> > >> > >>> [2]
> > >> https://github.com/apache/kafka/pull/14778#issuecomment-1820588080
> > >> > >>>
> > >> > >>> On Tue, 30 Jan 2024 at 01:01, Kamal Chandraprakash <
> > >> > >>> kamal.chandraprak...@gmail.com> wrote:
> > >> > >>>
> > >> > >>> > Hi all,
> > >> > >>> >
> > >> > >>> > I have opened a KIP-1018
> > >> > >>> > <
> > >> > >>> >
> > >> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests
> > >> > 

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-02 Thread Andrew Schofield
Hi Jun,
Thanks for the response.

147. I am trying to get a correspondence between the concepts and
metrics of consumer groups and share groups. In both cases,
the client doesn’t strictly know when the rebalance starts. All it knows
is when it has work to do in order to perform its part of a rebalance.
I am proposing that share groups and consumer groups use
equivalent logic.

I could remove the rebalance metrics from the client because I do
understand that they are making a judgement about when a rebalance
starts, but it’s their own part of the rebalance they are measuring.

I tend to think these metrics are better than no metrics and
will at least enable administrators to see how much rebalance
activity the members of share groups are experiencing.

150. The simple assignor does not take existing assignments into
consideration. The ShareGroupPartitionAssignor interface would
permit this, but the simple assignor does not currently use it.

The simple assignor assigns partitions in two ways:
a) Distribute the members across the partitions by hashed member ID.
b) If any partitions have no members assigned, distribute the members
across these partitions round-robin.

The (a) partitions will be quite stable. The (b) partitions will be less
stable. By using existing assignment information, it could make (b)
partition assignment more stable, whether the assignments are
persisted or not. Perhaps it would be worth changing the simple
assignor in order to make (b) more stable.

I envisage more sophisticated assignors in the future which could use
existing assignments and also other dynamic factors such as lag.

If it transpires that there is significant benefit in persisting assignments
specifically to help smooth assignment in the event of GC change,
it would be quite an easy enhancement. I am not inclined to persist
the assignments in this KIP. 

158. Ah, yes. I see. Of course, I want the names as consistent and
understandable too. I suggest renaming
ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
I haven’t changed the KIP yet, so let me know if that’s OK.

Thanks,
Andrew

> On 2 May 2024, at 22:18, Jun Rao  wrote:
> 
> Hi, Andrew,
> 
> Thanks for the reply.
> 
> 147. " it makes a judgement about whether an assignment received is equal
> to what it already is using."
> If a client receives an assignment different from what it has, it indicates
> the end of the rebalance. But how does the client know when the rebalance
> starts? In the shareHeartbeat design, the new group epoch is propagated
> together with the new assignment in the response.
> 
> 150. It could be a potential concern if each GC change forces significant
> assignment changes. Does the default assignor take existing assignments
> into consideration?
> 
> 155. Good point. Sounds good.
> 
> 158. My concern with the current naming is that it's not clear what the
> difference is between ReadShareGroupOffsetsState and ReadShareGroupState.
> The state in the latter is also offsets.
> 
> Jun
> 
> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield 
> wrote:
> 
>> Hi Jun,
>> Thanks for your reply.
>> 
>> 147. Perhaps the easiest is to take a look at the code in
>> o.a.k.clients.consumer.internal.MembershipManagerImpl.
>> This class is part of the new consumer group protocol
>> code in the client. It makes state transitions based on
>> the heartbeat requests and responses, and it makes a
>> judgement about whether an assignment received is
>> equal to what it already is using. When a state transition
>> is deemed to be the beginning or end of a rebalance
>> from the point of view of this client, it counts towards the
>> rebalance metrics.
>> 
>> Share groups will follow the same path.
>> 
>> 150. I do not consider it a concern. Rebalancing a share group
>> is less disruptive than rebalancing a consumer group. If the assignor
>> Has information about existing assignments, it can use it. It is
>> true that this information cannot be replayed from a topic and will
>> sometimes be unknown as a result.
>> 
>> 151. I don’t want to rename TopicPartitionsMetadata to
>> simply TopicPartitions (it’s information about the partitions of
>> a topic) because we then have an array of plurals.
>> I’ve renamed Metadata to Info. That’s a bit less cumbersome.
>> 
>> 152. Fixed.
>> 
>> 153. It’s the GC. Fixed.
>> 
>> 154. The UNKNOWN “state” is essentially a default for situations where
>> the code cannot understand data it received. For example, let’s say that
>> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka 4.1
>> introduced another state THINKING, a tool built with Kafka 4.0 would not
>> know what THINKING meant. It will use “UNKNOWN” to indicate that the
>> state was something that it could not understand.
>> 
>> 155. No, it’s a the level of the share-partition. If the offsets for just
>> one share-partition is reset, only the state epoch for that partition is
>> updated.
>> 
>> 156. Strictly speaking, it’s redundant. I think having the StartOffset

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-02 Thread Andrew Schofield
Hi,
I’ve changed my mind on this one having read through the comments.

I don’t think the exception handler should be able to mess with the headers
to the detriment of the code that called the handler.

While I like the hygiene of having an ImmutableHeaders interface,
I feel we can use the existing interface to get the effect we desire.

Thanks,
Andrew

> On 3 May 2024, at 03:40, Sophie Blee-Goldman  wrote:
> 
> I tend to agree that we should just return a pure Headers instead of
> introducing a new class/interface to protect overwriting them. I think a
> pretty good case has been made already so I won't add onto it, just wanted
> to voice my support.
> 
> Is that the only remaining question on this KIP? Might be ok to move to a
> vote now?
> 
> On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:
> 
>> Hi all, thanks Damien for the KIP!
>> 
>> After looking into the KIP and comments, my only concern is aligned with
>> one of Matthias comments, around the ImmutableHeaders introduction, with
>> the motivation not being clear enough. The existing handlers already expose
>> the headers (indirectly). Ex.
>> ProductionExceptionHandler.handleSerializationException provides the
>> ProducerRecord as an argument, so they are already exposed in those
>> callbacks through record.headers(). Is there a reason to think that it
>> would be a problem to expose the headers in the
>> new ProcessingExceptionHandler, but that it's not a problem for the
>> existing handler?
>> 
>> If there is no real concern about the KS engine requiring those headers, it
>> feels hard to mentally justify the complexity we transfer to the user by
>> exposing a new concept into the callbacks to represent the headers. In the
>> end, it strays aways from the simple/consistent representation of Headers
>> used all over. Even if eventually the KS engine needs to use the headers
>> after the callbacks with certainty that they were not altered, still feels
>> like it's something we could attempt to solve internally, without having to
>> transfer "new concepts" into the user (ex. the deep-copy as it was
>> suggested, seems like the kind of trade-off that would maybe be acceptable
>> here to gain simplicity and consistency among the handlers with a single
>> existing representation of Headers).
>> 
>> Best!
>> 
>> Lianet
>> 
>> 
>> 
>> On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:
>> 
>>> Thanks for the update.
>>> 
>>> I am wondering if we should use `ReadOnlyHeaders` instead of
>>> `ImmutableHeaders` as interface name?
>>> 
>>> Also, the returned `Header` interface is technically not immutable
>>> either, because `Header#key()` returns a mutable byte-array... Would we
>>> need a `ReadOnlyHeader` interface?
>>> 
>>> If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
>>> of `Headers` but it would rather be a standalone interface, and a
>>> wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
>>> immutable type instead of `byte[]` for the value()?
>>> 
>>> An alternative would be to deep-copy the value byte-array what would not
>>> be free, but given that we are talking about exception handling, it
>>> would not be on the hot code path, and thus might be acceptable?
>>> 
>>> 
>>> The above seems to increase the complexity significantly though. Hence,
>>> I have seconds thoughts on the immutability question:
>>> 
>>> Do we really need to worry about mutability after all, because in the
>>> end, KS runtime won't read the Headers instance after the handler was
>>> called, and if a user modifies the passed in headers, there won't be any
>>> actual damage (ie, no side effects)? For this case, it might even be ok
>>> to also not add `ImmutableHeaders` to begin with?
>>> 
>>> 
>>> 
>>> Sorry for the forth and back (yes, forth and back, because back and
>>> forth does not make sense -- it's not logical -- just trying to fix
>>> English :D) as I did bring up the immutability question in the first
>>> place...
>>> 
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 4/25/24 5:56 AM, Loic Greffier wrote:
 Hi Matthias,
 
 I have updated the KIP regarding points 103 and 108.
 
 103.
 I have suggested a new `ImmutableHeaders` interface to deal with the
 immutability concern of the headers, which is basically the `Headers`
 interface without the write accesses.
 
 public interface ImmutableHeaders {
 Header lastHeader(String key);
 Iterable headers(String key);
 Header[] toArray();
 }
 
 The `Headers` interface can be updated accordingly:
 
 public interface Headers extends ImmutableHeaders, Iterable {
 //…
 }
 
 Loïc
>>> 
>> 



Re: [DISCUSS] KIP-1041: Drop `offsets.commit.required.acks` config in 4.0 (deprecate in 3.8)

2024-05-02 Thread Andrew Schofield
Hi David,
I think this KIP is a very good idea. It would be good to get rid of this cruft.

Thanks,
Andrew

> On 2 May 2024, at 18:54, David Jacot  wrote:
>
> Hi folks,
>
> I have put together a very small KIP to
> deprecate offsets.commit.required.acks in 3.8 and remove it in 4.0. See the
> motivation for the reason.
>
> KIP: https://cwiki.apache.org/confluence/x/9YobEg
>
> Please let me know what you think.
>
> Best,
> David



Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-05-02 Thread Luke Chen
Hi Christo,

Thanks for the update.

Questions:
1. For this
"The possible state transition from DISABLED state is to the ENABLED."
I think it only applies for KRaft mode. In ZK mode, the possible state is
"DISABLING", right?

2. For this:
"If the cluster is using Zookeeper as the control plane, enabling remote
storage for a topic triggers the controller to send this information to
Zookeeper. Each broker listens for changes in Zookeeper, and when a change
is detected, the broker triggers RemoteLogManager#onLeadershipChange()."

I think the way ZK brokers knows the leadership change is by getting the
LeaderAndISRRequeset from the controller, not listening for changes in ZK.

3. In the KRaft handler steps, you said:
"The controller also updates the Topic metadata to increment the
tiered_epoch and update the tiered_stateto DISABLING state."

Should it be "DISABLED" state since it's KRaft mode?

4. I was thinking how we handle the tiered_epoch not match error.
For ZK, I think the controller won't write any data into ZK Znode,
For KRaft, either configRecord or updateTopicMetadata records won't be
written.
Is that right? Because the current workflow makes me think there will be
partial data updated in ZK/KRaft when tiered_epoch error.

5. Since we changed to use stopReplicas (V5) request now, the diagram for
ZK workflow might also need to update.

6. In ZK mode, what will the controller do if the "stopReplicas" responses
not received from all brokers? Reverting the changes?
This won't happen in KRaft mode because it's broker's responsibility to
fetch metadata update from controller.


Thank you.
Luke


On Fri, Apr 19, 2024 at 10:23 PM Christo Lolov 
wrote:

> Heya all!
>
> I have updated KIP-950. A list of what I have updated is:
>
> * Explicitly state that Zookeeper-backed clusters will have ENABLED ->
> DISABLING -> DISABLED while KRaft-backed clusters will only have ENABLED ->
> DISABLED
> * Added two configurations for the new thread pools and explained where
> values will be picked-up mid Kafka version upgrade
> * Explained how leftover remote partitions will be scheduled for deletion
> * Updated the API to use StopReplica V5 rather than a whole new
> controller-to-broker API
> * Explained that the disablement procedure will be triggered by the
> controller listening for an (Incremental)AlterConfig change
> * Explained that we will first move log start offset and then issue a
> deletion
> * Went into more details that changing remote.log.disable.policy after
> disablement won't do anything and that if a customer would like additional
> data deleted they would have to use already existing methods
>
> Let me know if there are any new comments or I have missed something!
>
> Best,
> Christo
>
> On Mon, 15 Apr 2024 at 12:40, Christo Lolov 
> wrote:
>
>> Heya Doguscan,
>>
>> I believe that the state of the world after this KIP will be the
>> following:
>>
>> For Zookeeper-backed clusters there will be 3 states: ENABLED, DISABLING
>> and DISABLED. We want this because Zookeeper-backed clusters will await a
>> confirmation from the brokers that they have indeed stopped tiered-related
>> operations on the topic.
>>
>> For KRaft-backed clusters there will be only 2 states: ENABLED and
>> DISABLED. KRaft takes a fire-and-forget approach for topic deletion. I
>> believe the same approach ought to be taken for tiered topics. The
>> mechanism which will ensure that leftover state in remote due to failures
>> is cleaned up to me is the retention mechanism. In today's code, a leader
>> deletes all segments it finds in remote with offsets below the log start
>> offset. I believe this will be good enough for cleaning up leftover state
>> in remote due to failures.
>>
>> I know that quite a few changes have been discussed so I will aim to put
>> them on paper in the upcoming days and let everyone know!
>>
>> Best,
>> Christo
>>
>> On Tue, 9 Apr 2024 at 14:49, Doğuşcan Namal 
>> wrote:
>>
>>> +1 let's not introduce a new api and mark it immediately as deprecated
>>> :)
>>>
>>> On your second comment Luke, one thing we need to clarify is when do we
>>> consider remote storage to be DISABLED for a topic?
>>> Particularly, what is the state when the remote storage is being deleted
>>> in case of disablement.policy=delete? Is it DISABLING or DISABLED?
>>>
>>> If we move directly to the DISABLED state,
>>>
>>> a) in case of failures, the leaders should continue remote storage
>>> deletion even if the topic is moved to the DISABLED state, otherwise we
>>> risk having stray data on remote storage.
>>> b) on each restart, we should initiate the remote storage deletion
>>> because although we replayed a record with a DISABLED state, we can not be
>>> sure if the remote data is deleted or not.
>>>
>>> We could either consider keeping the remote topic in DISABLING state
>>> until all of the remote storage data is deleted, or we need an additional
>>> mechanism to handle the remote stray data.
>>>
>>> The existing topic deletion, for 

Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Matthias J. Sax

I left one more nit on the discuss thread. But overall LGTM.

+1 (binding)

Thanks Rohan and Sophie for driving this KIP.


-Matthias

On 4/29/24 2:07 PM, Sophie Blee-Goldman wrote:

+1 (binding)

thanks for driving this KIP!

On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai  wrote:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

As this KIP has been open for a while, and gone through a couple rounds of
review/revision, I'm calling a vote to get it approved.





Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Matthias J. Sax
Thanks Sophie. My bad. You are of course right about `TaskAssignment` 
and the StreamsPartitionAssignor's responsibitliy to map tasks of a 
instance to consumers. When I wrote my reply, I forgot about this detail.


Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and "Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return type
being a Map rather than a TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman 
wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of anything
better.  I like your proposals though, will update the KIP names. I'll also
add a "NONE" option as well -- much better than just passing in null for no
error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


  Would also be an error if assigned to two consumers of the same client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not
responsible for the assignment of tasks to consumers within a KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task assigned to a
single KAfkaStreams client, and then somehow ended up assigning that task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating unfixable
issues due to the TaskAssignor itself returning an invalid assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just throwing a
StreamsException and crashing after we invoke the #onAssignmentComputed
callback to notify the user of the error.

On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 
wrote:


Jumping back to the party here :)

107: I agree with the rationale behind this, and
`numProcessingThreads` looks good to me as it covers both the current
and future scenarios.

117: I agree with Lucas and Bruno, and would add:
   * 117e: unknown taskID: fail
   * 117f: inconsistent task types (e.g. a known taskID was indicated
stateless from ApplicationState, but the returned AssignedTask states
stateful): fail
   * 117g: some ProcessID was not included in the returned Set: pass,
and interprets it as no tasks assigned to it.

And I'm open for any creative error codes folks would come up with :)


If any of these errors are detected, the StreamsPartitionAssignor will

immediately "fail" the rebalance and retry it by scheduling an 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Sophie Blee-Goldman
Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and "Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return type
being a Map rather than a TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman 
wrote:

> Matthias:
>
> Thanks for the naming suggestions for the error codes. I was
> definitely not happy with my original naming but couldn't think of anything
> better.  I like your proposals though, will update the KIP names. I'll also
> add a "NONE" option as well -- much better than just passing in null for no
> error.
>
> > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
>> same active task
>
>  Would also be an error if assigned to two consumers of the same client...
>> Needs to be rephrased.
>
>
> Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not
> responsible for the assignment of tasks to consumers within a KafkaStreams.
> It would be a bug in the StreamsPartitionAssignor if it received a valid
> assignment from the TaskAssignor with only one copy of a task assigned to a
> single KAfkaStreams client, and then somehow ended up assigning that task
> to multiple consumers on the KafkaStreams client. It wouldn't be the
> TaskAssignor's fault so imo it would not make sense to include this case in
> the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
> ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that caused
> the StreamsPartitionAssignor to assign a task to multiple consumers, it
> presumably wouldn't even notice since it's a bug -- if it did notice, it
> can just fix the issue. The error codes are about communicating unfixable
> issues due to the TaskAssignor itself returning an invalid assignment. The
> phrasing is intentional, and (imo) correct as it is.
>
> I do see your point about how the StreamsPartitionAssignor should
> handle/react to invalid assignments though. I'm fine with just throwing a
> StreamsException and crashing after we invoke the #onAssignmentComputed
> callback to notify the user of the error.
>
> On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 
> wrote:
>
>> Jumping back to the party here :)
>>
>> 107: I agree with the rationale behind this, and
>> `numProcessingThreads` looks good to me as it covers both the current
>> and future scenarios.
>>
>> 117: I agree with Lucas and Bruno, and would add:
>>   * 117e: unknown taskID: fail
>>   * 117f: inconsistent task types (e.g. a known taskID was indicated
>> stateless from ApplicationState, but the returned AssignedTask states
>> stateful): fail
>>   * 117g: some ProcessID was not included in the returned Set: pass,
>> and interprets it as no tasks assigned to it.
>>
>> And I'm open for any creative error codes folks would come up with :)
>>
>> > If any of these errors are detected, the StreamsPartitionAssignor will
>> immediately "fail" the rebalance and retry it by scheduling an immediate
>> followup rebalance.
>>
>> I'm also a bit concerned here, as such endless retry loops have
>> happened in the past in my memory. Given that we would likely see most
>> of the user implementations be deterministic, I'm also leaning towards
>> 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Sophie Blee-Goldman
Matthias:

Thanks for the naming suggestions for the error codes. I was definitely not
happy with my original naming but couldn't think of anything better.  I
like your proposals though, will update the KIP names. I'll also add a
"NONE" option as well -- much better than just passing in null for no error.

> OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same
> active task

 Would also be an error if assigned to two consumers of the same client...
> Needs to be rephrased.


Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not
responsible for the assignment of tasks to consumers within a KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task assigned to a
single KAfkaStreams client, and then somehow ended up assigning that task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating unfixable
issues due to the TaskAssignor itself returning an invalid assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just throwing a
StreamsException and crashing after we invoke the #onAssignmentComputed
callback to notify the user of the error.

On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 
wrote:

> Jumping back to the party here :)
>
> 107: I agree with the rationale behind this, and
> `numProcessingThreads` looks good to me as it covers both the current
> and future scenarios.
>
> 117: I agree with Lucas and Bruno, and would add:
>   * 117e: unknown taskID: fail
>   * 117f: inconsistent task types (e.g. a known taskID was indicated
> stateless from ApplicationState, but the returned AssignedTask states
> stateful): fail
>   * 117g: some ProcessID was not included in the returned Set: pass,
> and interprets it as no tasks assigned to it.
>
> And I'm open for any creative error codes folks would come up with :)
>
> > If any of these errors are detected, the StreamsPartitionAssignor will
> immediately "fail" the rebalance and retry it by scheduling an immediate
> followup rebalance.
>
> I'm also a bit concerned here, as such endless retry loops have
> happened in the past in my memory. Given that we would likely see most
> of the user implementations be deterministic, I'm also leaning towards
> failing the app immediately and let the crowd educates us if there are
> some very interesting scenarios out there that are not on our radar to
> re-consider this, rather than getting hard to debug cases in the dark.
>
> -
>
> And here are just some nits about the KIP writings itself:
>
> * I think some bullet points under `User APIs` and `Read-only APIs`
> should have a lower level indention? It caught me for a sec until I
> realized there are just two categories.
>
> * In TaskAssignmentUtils , why not let those util functions return
> `TaskAssignment` (to me it feels more consistent with the user APIs),
> but instead return a Map?
>
>
> Guozhang
>
> On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax  wrote:
> >
> > I like the idea of error codes. Not sure if the name are ideal?
> > UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
> > difficult to understand?
> >
> > Should we be very descriptive (and also try to avoid coupling it to the
> > threading model -- important for the first error code):
> >   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
> >   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE
> >
> > I think we also need to add NONE as option or make the error parameter
> > an `Optional`?
> >
> >
> > > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
> same active task
> >
> > Would also be an error if assigned to two consumers of the same
> > client... Needs to be rephrased.
> >
> >
> >
> > > If any of these errors are detected, the StreamsPartitionAssignor will
> immediately "fail" the rebalance and retry it by scheduling an immediate
> followup rebalance.
> >
> > Does this make sense? If we assume that the task-assignment is
> > deterministic, we would end up with an infinite retry loop? Also,
> > assuming that an client leave the group, we cannot assign some task any
> > longer... I would rather throw a StreamsException and let the client
> crash.
> >
> >
> >
> > -Matthias
> >
> > On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:
> > > One last thing: I added an error code enum to be returned from the
> > > #onAssignmentComputed method in case of an invalid assignment. I
> 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-02 Thread Sophie Blee-Goldman
I tend to agree that we should just return a pure Headers instead of
introducing a new class/interface to protect overwriting them. I think a
pretty good case has been made already so I won't add onto it, just wanted
to voice my support.

Is that the only remaining question on this KIP? Might be ok to move to a
vote now?

On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:

> Hi all, thanks Damien for the KIP!
>
> After looking into the KIP and comments, my only concern is aligned with
> one of Matthias comments, around the ImmutableHeaders introduction, with
> the motivation not being clear enough. The existing handlers already expose
> the headers (indirectly). Ex.
> ProductionExceptionHandler.handleSerializationException provides the
> ProducerRecord as an argument, so they are already exposed in those
> callbacks through record.headers(). Is there a reason to think that it
> would be a problem to expose the headers in the
> new ProcessingExceptionHandler, but that it's not a problem for the
> existing handler?
>
> If there is no real concern about the KS engine requiring those headers, it
> feels hard to mentally justify the complexity we transfer to the user by
> exposing a new concept into the callbacks to represent the headers. In the
> end, it strays aways from the simple/consistent representation of Headers
> used all over. Even if eventually the KS engine needs to use the headers
> after the callbacks with certainty that they were not altered, still feels
> like it's something we could attempt to solve internally, without having to
> transfer "new concepts" into the user (ex. the deep-copy as it was
> suggested, seems like the kind of trade-off that would maybe be acceptable
> here to gain simplicity and consistency among the handlers with a single
> existing representation of Headers).
>
> Best!
>
> Lianet
>
>
>
> On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:
>
> > Thanks for the update.
> >
> > I am wondering if we should use `ReadOnlyHeaders` instead of
> > `ImmutableHeaders` as interface name?
> >
> > Also, the returned `Header` interface is technically not immutable
> > either, because `Header#key()` returns a mutable byte-array... Would we
> > need a `ReadOnlyHeader` interface?
> >
> > If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
> > of `Headers` but it would rather be a standalone interface, and a
> > wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
> > immutable type instead of `byte[]` for the value()?
> >
> > An alternative would be to deep-copy the value byte-array what would not
> > be free, but given that we are talking about exception handling, it
> > would not be on the hot code path, and thus might be acceptable?
> >
> >
> > The above seems to increase the complexity significantly though. Hence,
> > I have seconds thoughts on the immutability question:
> >
> > Do we really need to worry about mutability after all, because in the
> > end, KS runtime won't read the Headers instance after the handler was
> > called, and if a user modifies the passed in headers, there won't be any
> > actual damage (ie, no side effects)? For this case, it might even be ok
> > to also not add `ImmutableHeaders` to begin with?
> >
> >
> >
> > Sorry for the forth and back (yes, forth and back, because back and
> > forth does not make sense -- it's not logical -- just trying to fix
> > English :D) as I did bring up the immutability question in the first
> > place...
> >
> >
> >
> > -Matthias
> >
> > On 4/25/24 5:56 AM, Loic Greffier wrote:
> > > Hi Matthias,
> > >
> > > I have updated the KIP regarding points 103 and 108.
> > >
> > > 103.
> > > I have suggested a new `ImmutableHeaders` interface to deal with the
> > > immutability concern of the headers, which is basically the `Headers`
> > > interface without the write accesses.
> > >
> > > public interface ImmutableHeaders {
> > >  Header lastHeader(String key);
> > >  Iterable headers(String key);
> > >  Header[] toArray();
> > > }
> > >
> > > The `Headers` interface can be updated accordingly:
> > >
> > > public interface Headers extends ImmutableHeaders, Iterable {
> > >  //…
> > > }
> > >
> > > Loïc
> >
>


Re: KAFKA-16221

2024-05-02 Thread Sophie Blee-Goldman
Hey Matthias -- I'm quite sure you should  *not* do what we're doing in
https://github.com/apache/kafka/pull/15315. That's definitely a bad hack,
and IIUC the only reason we accepted it was because the choice was between
implementing a hacky temporary fix and blocking the entire release while we
figured out the "right" way to fix this. Sadly, it seems like we never
followed up after the release was cut, and AFAICT everyone forgot about
this so-called "temporary" hack as predicted
...

Anyways, as a general rule, you should never try to catch and swallow an
IllegalStateException. You never know if you might be swallowing an actual
problem and messing up your application. It's better to just make sure it
doesn't get thrown in the first place.

In this specific case, it seems like the "illegal state" that both you and
Kafka Streams are hitting comes from calling #abortTransaction after a
timeout. So it sounds to me like you should just add a separate catch block
for TimeoutException and retry the #commitTransaction.

For the full explanation as to why you need to retry the #commitTxn (as
opposed to just starting a new transaction), check out this paragraph in
the #commitTransaction javadocs:

Note that this method will raise TimeoutException if the transaction cannot
> be committed before expiration of max.block.ms, but this does not mean
> the request did not actually reach the broker. In fact, it only indicates
> that we cannot get the acknowledgement response in time, so it's up to the
> application's logic to decide how to handle timeouts. Additionally, it will
> raise InterruptException if interrupted. It is safe to retry in either
> case, but it is not possible to attempt a different operation (such as
> abortTransaction) since the commit may already be in the progress of
> completing. If not retrying, the only option is to close the producer.


It does seem like the exception handling example is objectively incorrect,
since TimeoutException extends KafkaException and will, as you experienced,
end up in the KafkaException catch block that tries to abort the exception.
I get that people might want to handle timeouts differently, and it's
tricky since they can also be thrown from abortTransaction and has to be
handled in the same way (ie you can only retry the same API or close the
producer). But the example is very misleading and should absolutely be
updated to include explicit TimeoutException handling in my opinion.

Would you be interested in doing a PR to fix the Producer javadcos and
improve the exception handling example?

On Thu, May 2, 2024 at 4:22 AM  wrote:

> Hi,
>
> Thanks for your work and sorry to bother you.
>
> My code gets the same IllegalStateException from KafkaProducer as Kafka
> Stream gets in KAFKA-16221:
>
> java.lang.IllegalStateException: Cannot attempt operation
> `abortTransaction` because the previous call to `commitTransaction` timed
> out and must be retried
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1109)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
> at
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
>
> I have followed the recipe from
> https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> for error handling for a transactional producer, i.e.
>
>  try {
>  producer.beginTransaction();
>  for (int i = 0; i < 100; i++)
>  producer.send(new ProducerRecord<>("my-topic",
> Integer.toString(i), Integer.toString(i)));
>  producer.commitTransaction();
>  } catch (ProducerFencedException | OutOfOrderSequenceException |
> AuthorizationException e) {
>  // We can't recover from these exceptions, so our only option is to
> close the producer and exit.
>  producer.close();
>  } catch (KafkaException e) {
>  // For all other exceptions, just abort the transaction and try again.
>  producer.abortTransaction();
>  }
>  producer.close();
>
> Kafka Streams has solved KAFKA-16221 by introducing a hack (
> https://github.com/apache/kafka/pull/15315), but plans a clean solution.
>
> Does that mean that above recipe is outdated?
> Is there really no simple, clean solution how to do the error handling?
> Should I use the solution from https://github.com/apache/kafka/pull/15315
> and wait for what Kafka Streams comes up next for the clean solution?
>
> Kind regards, Matthias Kraaz
>
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2866

2024-05-02 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16659) KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER

2024-05-02 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16659:
--

 Summary: KafkaConsumer#position() does not respect wakup when 
group protocol is CONSUMER
 Key: KAFKA-16659
 URL: https://issues.apache.org/jira/browse/KAFKA-16659
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see following test


{code:scala}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  @Timeout(15)
  def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit = 
{
val topicPartition = new TopicPartition(topic, 15)
val consumer = createConsumer()
consumer.assign(List(topicPartition).asJava)

CompletableFuture.runAsync { () =>
  TimeUnit.SECONDS.sleep(1)
  consumer.wakeup()
}

assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(3)))
  }
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-05-02 Thread Sophie Blee-Goldman
Sounds good -- exceptionOrigin makes sense to me.

Let us know when you've updated the KIP. I'll cast a vote once these last
small changes have been made

On Thu, May 2, 2024 at 10:16 AM Frédérik Rouleau
 wrote:

> Hi Sophie,
>
> I agree that the subclasses have limited value and I am not a fan of
> "instance of" usage either.
> I do not see any problem with adding a field but I would rather name it
> something like exceptionOrigin. Any thoughts?
>
> About byteBuffer vs byte[], byteBuffer are more generic and with proper
> doc/example, I do not think it's an issue. I will then remove the byte[]
> returning methods.
>
> Thanks,
>
>
> [image: Confluent] 
> Frederik Rouleau
> Sr Customer Success Technical Architect
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] [image: LinkedIn]
> [image: Slack]
> [image: YouTube]
> 
>
> [image: Try Confluent Cloud for Free]
> <
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic
> >
>
>
> On Tue, Apr 30, 2024 at 10:54 PM Sophie Blee-Goldman <
> sop...@responsive.dev>
> wrote:
>
> > Actually one more thing, after thinking a bit about how this would be
> used
> > in practice, I'm inclined to agree that maybe distinguishing between key
> vs
> > value errors via subclasses is not the cleanest way to present the API.
> > Users would still essentially want to catch the general
> > RecordDeserializationException error since in practice, much of the
> > handling is likely to be the same between key and value errors. So then
> to
> > distinguish between these, they would end up doing an "instance of" check
> > on the exception type. Which feels like an awkward way to answer a
> question
> > that could have just been a simple API on the
> > RecordDeserializationException itself. What do you think about getting
> rid
> > of the subclasses and instead adding one more API to the
> > RecordDeserializationException that indicates whether it was a key or
> value
> > error?
> >
> > This could return a simple boolean and be called #isKeyError or
> something,
> > but that feels kind of awkward too. Maybe a better alternative would be
> > something like this:
> >
> > class RecordDeserializationException {
> > enum DeserializationExceptionType {
> > KEY,
> > VALUE
> > }
> >
> >   public DeserializationExceptionType exceptionType();
> > }
> >
> > I also worry that people don't always check for exception subtypes and
> > would easily miss the existence of the KeyDeserializationException and
> > ValueDeserializationException. Simply adding an API to the
> > RecordDeserializationException will make it much easier for users to
> notice
> > and react accordingly, if they care to do something different based on
> > whether the error happened during key or value deserialization.
> >
> > Thoughts?
> >
> > On Tue, Apr 30, 2024 at 1:45 PM Sophie Blee-Goldman <
> sop...@responsive.dev
> > >
> > wrote:
> >
> > > Hey Fred, I think this is looking good but I just want to follow up on
> > > what Kirk asked earlier about having both the ByteBuffer and byte[]
> > forms.
> > > Can't users just use the ByteBuffer versions and convert them into a
> > byte[]
> > > themselves? In some cases it maybe makes sense to offer some additional
> > > APIs if there is complex processing involved in converting between
> > returned
> > > types, but ByteBuffer --> byte[] seems pretty straightforward to me :)
> > >
> > > Generally speaking we try to keep the APIs as tight as possible and
> offer
> > > only what is necessary, and I'd rather leave off "syntactic sugar" type
> > > APIs unless there is a clear need. Put another way: it's easy to add
> > > additional methods if someone wants them, but it's much harder to
> remote
> > > methods since we have to go through a deprecation cycle. So I'd prefer
> to
> > > just keep only the ByteBuffer versions (or only the byte[] -- don't
> > > personally care which of the two)
> > >
> > > One more small nit: since we're deprecating the old exception
> > constructor,
> > > can you list that in the "Compatibility, Deprecation, and Migration
> Plan"
> > > section?
> > >
> > >
> > >
> > > On Wed, Apr 24, 2024 at 5:35 AM Frédérik Rouleau
> > >  wrote:
> > >
> > >> Hi,
> > >>
> > >> I have updated the KIP now and the latest version of PR is available.
> > >>
> > >> About Kirk's questions:
> > >>
> > >> K11: Yes, both can have a deserialization exception but we deserialize
> > the
> > >> key first, so if an error occurs then, we do not try to deserialize
> the
> > >> value. So the exception raised is always for key or value.
> > >>
> > >> K12: Not sure of concrete usage for now, just a sugar feature. I
> suppose
> > >> we
> > >> 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-05-02 Thread Frédérik Rouleau
Hi Sophie,

I agree that the subclasses have limited value and I am not a fan of
"instance of" usage either.
I do not see any problem with adding a field but I would rather name it
something like exceptionOrigin. Any thoughts?

About byteBuffer vs byte[], byteBuffer are more generic and with proper
doc/example, I do not think it's an issue. I will then remove the byte[]
returning methods.

Thanks,


[image: Confluent] 
Frederik Rouleau
Sr Customer Success Technical Architect
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]



On Tue, Apr 30, 2024 at 10:54 PM Sophie Blee-Goldman 
wrote:

> Actually one more thing, after thinking a bit about how this would be used
> in practice, I'm inclined to agree that maybe distinguishing between key vs
> value errors via subclasses is not the cleanest way to present the API.
> Users would still essentially want to catch the general
> RecordDeserializationException error since in practice, much of the
> handling is likely to be the same between key and value errors. So then to
> distinguish between these, they would end up doing an "instance of" check
> on the exception type. Which feels like an awkward way to answer a question
> that could have just been a simple API on the
> RecordDeserializationException itself. What do you think about getting rid
> of the subclasses and instead adding one more API to the
> RecordDeserializationException that indicates whether it was a key or value
> error?
>
> This could return a simple boolean and be called #isKeyError or something,
> but that feels kind of awkward too. Maybe a better alternative would be
> something like this:
>
> class RecordDeserializationException {
> enum DeserializationExceptionType {
> KEY,
> VALUE
> }
>
>   public DeserializationExceptionType exceptionType();
> }
>
> I also worry that people don't always check for exception subtypes and
> would easily miss the existence of the KeyDeserializationException and
> ValueDeserializationException. Simply adding an API to the
> RecordDeserializationException will make it much easier for users to notice
> and react accordingly, if they care to do something different based on
> whether the error happened during key or value deserialization.
>
> Thoughts?
>
> On Tue, Apr 30, 2024 at 1:45 PM Sophie Blee-Goldman  >
> wrote:
>
> > Hey Fred, I think this is looking good but I just want to follow up on
> > what Kirk asked earlier about having both the ByteBuffer and byte[]
> forms.
> > Can't users just use the ByteBuffer versions and convert them into a
> byte[]
> > themselves? In some cases it maybe makes sense to offer some additional
> > APIs if there is complex processing involved in converting between
> returned
> > types, but ByteBuffer --> byte[] seems pretty straightforward to me :)
> >
> > Generally speaking we try to keep the APIs as tight as possible and offer
> > only what is necessary, and I'd rather leave off "syntactic sugar" type
> > APIs unless there is a clear need. Put another way: it's easy to add
> > additional methods if someone wants them, but it's much harder to remote
> > methods since we have to go through a deprecation cycle. So I'd prefer to
> > just keep only the ByteBuffer versions (or only the byte[] -- don't
> > personally care which of the two)
> >
> > One more small nit: since we're deprecating the old exception
> constructor,
> > can you list that in the "Compatibility, Deprecation, and Migration Plan"
> > section?
> >
> >
> >
> > On Wed, Apr 24, 2024 at 5:35 AM Frédérik Rouleau
> >  wrote:
> >
> >> Hi,
> >>
> >> I have updated the KIP now and the latest version of PR is available.
> >>
> >> About Kirk's questions:
> >>
> >> K11: Yes, both can have a deserialization exception but we deserialize
> the
> >> key first, so if an error occurs then, we do not try to deserialize the
> >> value. So the exception raised is always for key or value.
> >>
> >> K12: Not sure of concrete usage for now, just a sugar feature. I suppose
> >> we
> >> can imagine some use case where you need/want only the first bytes and
> do
> >> not want to waste memory allocating the whole payload (SchemaRegistry's
> >> schema Id or something similar).
> >>
> >> K13: The old constructor is not needed anymore. It is just for
> >> compatibility until removed in a major version. As public we might have
> >> some users using it even if I cannot see any valid reason for this.
> >>
> >> Thanks,
> >> Fred
> >>
> >
>


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-02 Thread Jun Rao
Hi, Andrew,

Thanks for the reply.

147. " it makes a judgement about whether an assignment received is equal
to what it already is using."
If a client receives an assignment different from what it has, it indicates
the end of the rebalance. But how does the client know when the rebalance
starts? In the shareHeartbeat design, the new group epoch is propagated
together with the new assignment in the response.

150. It could be a potential concern if each GC change forces significant
assignment changes. Does the default assignor take existing assignments
into consideration?

155. Good point. Sounds good.

158. My concern with the current naming is that it's not clear what the
difference is between ReadShareGroupOffsetsState and ReadShareGroupState.
The state in the latter is also offsets.

Jun

On Wed, May 1, 2024 at 9:51 PM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for your reply.
>
> 147. Perhaps the easiest is to take a look at the code in
> o.a.k.clients.consumer.internal.MembershipManagerImpl.
> This class is part of the new consumer group protocol
> code in the client. It makes state transitions based on
> the heartbeat requests and responses, and it makes a
> judgement about whether an assignment received is
> equal to what it already is using. When a state transition
> is deemed to be the beginning or end of a rebalance
> from the point of view of this client, it counts towards the
> rebalance metrics.
>
> Share groups will follow the same path.
>
> 150. I do not consider it a concern. Rebalancing a share group
> is less disruptive than rebalancing a consumer group. If the assignor
> Has information about existing assignments, it can use it. It is
> true that this information cannot be replayed from a topic and will
> sometimes be unknown as a result.
>
> 151. I don’t want to rename TopicPartitionsMetadata to
> simply TopicPartitions (it’s information about the partitions of
> a topic) because we then have an array of plurals.
> I’ve renamed Metadata to Info. That’s a bit less cumbersome.
>
> 152. Fixed.
>
> 153. It’s the GC. Fixed.
>
> 154. The UNKNOWN “state” is essentially a default for situations where
> the code cannot understand data it received. For example, let’s say that
> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka 4.1
> introduced another state THINKING, a tool built with Kafka 4.0 would not
> know what THINKING meant. It will use “UNKNOWN” to indicate that the
> state was something that it could not understand.
>
> 155. No, it’s a the level of the share-partition. If the offsets for just
> one share-partition is reset, only the state epoch for that partition is
> updated.
>
> 156. Strictly speaking, it’s redundant. I think having the StartOffset
> separate gives helpful clarity and I prefer to retain it.
>
> 157. Yes, you are right. There’s no reason why a leader change needs
> to force a ShareSnapshot. I’ve added leaderEpoch to the ShareUpdate.
>
> 158. Although ReadShareGroupOffsetsState is a bit of a mouthful,
> having “State” in the name makes it clear that this one the family of
> inter-broker RPCs served by the share coordinator. The admin RPCs
> such as DescribeShareGroupOffsets do not include “State”.
>
> 159. Fixed.
>
> 160. Fixed.
>
> Thanks,
> Andrew
>
> > On 2 May 2024, at 00:29, Jun Rao  wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the reply.
> >
> > 147. "The measurement is certainly from the point of view of the client,
> > but it’s driven by sending and receiving heartbeats rather than whether
> the
> > client triggered the rebalance itself."
> > Hmm, how does a client know which heartbeat response starts a rebalance?
> >
> > 150. PartitionAssignor takes existing assignments into consideration.
> Since
> > GC doesn't persist the assignment for share groups, it means that
> > ShareGroupPartitionAssignor can't reliably depend on existing
> assignments.
> > Is that a concern?
> >
> > 151. ShareGroupPartitionMetadataValue: Should we rename
> > TopicPartitionsMetadata and TopicMetadata since there is no metadata?
> >
> > 152. ShareGroupMetadataKey: "versions": "3"
> > The versions should be 11.
> >
> > 153. ShareGroupDescription.coordinator(): The description says "The share
> > group coordinator". Is that the GC or SC?
> >
> > 154. "A share group has only three states - EMPTY , STABLE and DEAD".
> >  What about UNKNOWN?
> >
> > 155. WriteShareGroupState: StateEpoch is at the group level, not
> partition
> > level, right?
> >
> > 156. ShareSnapshotValue: Is StartOffset redundant since it's the same as
> > the smallest FirstOffset in StateBatches?
> >
> > 157. Every leader change forces a ShareSnapshotValue write to persist the
> > new leader epoch. Is that a concern? An alternative is to include
> > leaderEpoch in ShareUpdateValue.
> >
> > 158. ReadShareGroupOffsetsState: The state is the offsets. Should we
> rename
> > it to something like ReadShareGroupStartOffset?
> >
> > 159. members are assigned members round-robin => members are assigned
> > 

[PR] MINOR: Add code signing key for Igor [kafka-site]

2024-05-02 Thread via GitHub


soarez opened a new pull request, #600:
URL: https://github.com/apache/kafka-site/pull/600

   For the first PMC reviewing this, could you please update the KEYS file in 
https://dist.apache.org/repos/dist/release/kafka/KEYS and leave a comment in 
this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2865

2024-05-02 Thread Apache Jenkins Server
See 




[DISCUSS] Proposal about support for wildcard when creating new acls

2024-05-02 Thread Murali Basani
Hello,

I'd like to propose a suggestion to our resource patterns in Kafka ACLs.

Currently, when adding new ACLs in Kafka, we have two types of resource
patterns for topics:

   - LITERAL
   - PREFIXED

However, when it comes to listing or removing ACLs, we have a couple more
options:

   - MATCH
   - ANY (will match any pattern type)


If we can extend creating acls as well with 'MATCH' pattern type, it would
be very beneficial. Even though this kind of acl should be created with
utmost care, it will help organizations streamline their ACL management
processes.

Example scenarios :

Let's say we need to create ACLs for the following six topics:
nl-accounts-localtopic, nl-accounts-remotetopic, de-accounts-localtopic,
de-accounts-remotetopic, cz-accounts-localtopic, cz-accounts-remotetopic

Currently, we achieve this using existing functionality by creating three
prefixed ACLs as shown below:

kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic nl-accounts- \
> --resource-pattern-type prefixed


kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic de-accounts- \
> --resource-pattern-type prefixed


kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic cz-accounts- \
> --resource-pattern-type prefixed


However, if we had the 'MATCH' pattern type available, we could accomplish
this with a single ACL, as illustrated here:

kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic *-accounts-* \
> --resource-pattern-type match



This pattern closely resembles PREFIXED but offers broader allow/deny rules.

Implementing this change could significantly reduce the effort in several
acl management processes.

I welcome your thoughts and any concerns you may have regarding this
proposal.

Thanks,
Murali


Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics

2024-05-02 Thread Lucas Brutschy
Hi Nick!

I agree, the age variant is a bit nicer since the semantics are very
clear from the name. If you'd rather go for the simple implementation,
how about calling it `oldest-iterator-open-since-ms`? I believe this
could be understood without docs. Either way, I think we should be
able to open the vote for this KIP because nobody raised any major /
blocking concerns.

Looking forward to getting this voted on soon!

Cheers
Lucas

On Sun, Mar 31, 2024 at 5:23 PM Nick Telford  wrote:
>
> Hi Matthias,
>
> > For the oldest iterator metric, I would propose something simple like
> > `iterator-opened-ms` and it would just be the actual timestamp when the
> > iterator was opened. I don't think we need to compute the actual age,
> > but user can to this computation themselves?
>
> That works for me; it's easier to implement like that :-D I'm a little
> concerned that the name "iterator-opened-ms" may not be obvious enough
> without reading the docs.
>
> > If we think reporting the age instead of just the timestamp is better, I
> > would propose `iterator-max-age-ms`. I should be sufficient to call out
> > (as it's kinda "obvious" anyway) that the metric applies to open
> > iterator only.
>
> While I think it's preferable to record the timestamp, rather than the age,
> this does have the benefit of a more obvious metric name.
>
> > Nit: the KIP says it's a store-level metric, but I think it would be
> > good to say explicitly that it's recorded with DEBUG level only?
>
> Yes, I've already updated the KIP with this information in the table.
>
> Regards,
>
> Nick
>
> On Sun, 31 Mar 2024 at 10:53, Matthias J. Sax  wrote:
>
> > The time window thing was just an idea. Happy to drop it.
> >
> > For the oldest iterator metric, I would propose something simple like
> > `iterator-opened-ms` and it would just be the actual timestamp when the
> > iterator was opened. I don't think we need to compute the actual age,
> > but user can to this computation themselves?
> >
> > If we think reporting the age instead of just the timestamp is better, I
> > would propose `iterator-max-age-ms`. I should be sufficient to call out
> > (as it's kinda "obvious" anyway) that the metric applies to open
> > iterator only.
> >
> > And yes, I was hoping that the code inside MetereXxxStore might already
> > be setup in a way that custom stores would inherit the iterator metrics
> > automatically -- I am just not sure, and left it as an exercise for
> > somebody to confirm :)
> >
> >
> > Nit: the KIP says it's a store-level metric, but I think it would be
> > good to say explicitly that it's recorded with DEBUG level only?
> >
> >
> >
> > -Matthias
> >
> >
> > On 3/28/24 2:52 PM, Nick Telford wrote:
> > > Quick addendum:
> > >
> > > My suggested metric "oldest-open-iterator-age-seconds" should be
> > > "oldest-open-iterator-age-ms". Milliseconds is obviously a better
> > > granularity for such a metric.
> > >
> > > Still accepting suggestions for a better name.
> > >
> > > On Thu, 28 Mar 2024 at 13:41, Nick Telford 
> > wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> Sorry for leaving this for so long. So much for "3 weeks until KIP
> > freeze"!
> > >>
> > >> On Sophie's comments:
> > >> 1. Would Matthias's suggestion of a separate metric tracking the age of
> > >> the oldest open iterator (within the tag set) satisfy this? That way we
> > can
> > >> keep iterator-duration-(avg|max) for closed iterators, which can be
> > useful
> > >> for performance debugging for iterators that don't leak. I'm not sure
> > what
> > >> we'd call this metric, maybe: "oldest-open-iterator-age-seconds"? Seems
> > >> like a mouthful.
> > >>
> > >> 2. You're right, it makes more sense to provide
> > >> iterator-duration-(avg|max). Honestly, I can't remember why I had
> > "total"
> > >> before, or why I was computing a rate-of-change over it.
> > >>
> > >> 3, 4, 5, 6. Agreed, I'll make all those changes as suggested.
> > >>
> > >> 7. Combined with Matthias's point about RocksDB, I'm convinced that this
> > >> is the wrong KIP for these. I'll introduce the additional Rocks metrics
> > in
> > >> another KIP.
> > >>
> > >> On Matthias's comments:
> > >> A. Not sure about the time window. I'm pretty sure all existing avg/max
> > >> metrics are since the application was started? Any other suggestions
> > here
> > >> would be appreciated.
> > >>
> > >> B. Agreed. See point 1 above.
> > >>
> > >> C. Good point. My focus was very much on Rocks memory leaks when I wrote
> > >> the first draft. I can generalise it. My only concern is that it might
> > make
> > >> it more difficult to detect Rocks iterator leaks caused *within* our
> > >> high-level iterator, e.g. RocksJNI, RocksDB, RocksDBStore, etc. But we
> > >> could always provide a RocksDB-specific metric for this, as you
> > suggested.
> > >>
> > >> D. Hmm, we do already have MeteredKeyValueIterator, which automatically
> > >> wraps the iterator from inner-stores of MeteredKeyValueStore. If we
> > >> implemented these 

Suggestion about support for wildcard when creating new acls

2024-05-02 Thread Murali Basani
Hello,

I'd like to propose a suggestion to our resource patterns in Kafka ACLs.

Currently, when adding new ACLs in Kafka, we have two types of resource
patterns for topics:

   - LITERAL
   - PREFIXED

However, when it comes to listing or removing ACLs, we have a couple more
options:

   - MATCH
   - ANY (will match any pattern type)


If we can extend creating acls as well with 'MATCH' pattern type, it would
be very beneficial. Even though this kind of acl should be created with
utmost care, it will help organizations streamline their ACL management
processes.

Example scenarios :

Let's say we need to create ACLs for the following six topics:
nl-accounts-localtopic, nl-accounts-remotetopic, de-accounts-localtopic,
de-accounts-remotetopic, cz-accounts-localtopic, cz-accounts-remotetopic

Currently, we achieve this using existing functionality by creating three
prefixed ACLs as shown below:

kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic nl-accounts- \
> --resource-pattern-type prefixed


kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic de-accounts- \
> --resource-pattern-type prefixed


kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic cz-accounts- \
> --resource-pattern-type prefixed


However, if we had the 'MATCH' pattern type available, we could accomplish
this with a single ACL, as illustrated here:

kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic *-accounts-* \
> --resource-pattern-type match



This pattern closely resembles PREFIXED but offers broader allow/deny rules.

Implementing this change could significantly reduce the effort in several
acl management processes.

I welcome your thoughts and any concerns you may have regarding this
proposal.

Thanks,
Murali


[DISCUSS] KIP-1041: Drop `offsets.commit.required.acks` config in 4.0 (deprecate in 3.8)

2024-05-02 Thread David Jacot
Hi folks,

I have put together a very small KIP to
deprecate offsets.commit.required.acks in 3.8 and remove it in 4.0. See the
motivation for the reason.

KIP: https://cwiki.apache.org/confluence/x/9YobEg

Please let me know what you think.

Best,
David


[jira] [Created] (KAFKA-16658) Drop `offsets.commit.required.acks` config in 4.0 (deprecate in 3.8)

2024-05-02 Thread David Jacot (Jira)
David Jacot created KAFKA-16658:
---

 Summary: Drop `offsets.commit.required.acks` config in 4.0 
(deprecate in 3.8)
 Key: KAFKA-16658
 URL: https://issues.apache.org/jira/browse/KAFKA-16658
 Project: Kafka
  Issue Type: New Feature
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-477: Add PATCH method for connector config in Connect REST API

2024-05-02 Thread Ivan Yurchenko
Hello all,

Thank you for your votes!

I think we have reached a decision here to accept the KIP with the following 
votes:

- 3x binding +1s from Chris Egerton, Yash Mayya, and Greg Harris.
- 1x non-binding +1s from Knowles Atchison Jr.
- No -1s.

I will proceed with the implementation. Feel free to track the jira [1] and the 
(now still in draft) PR [2].

Best,
Ivan

[1] https://issues.apache.org/jira/browse/KAFKA-16445
[2] https://github.com/apache/kafka/pull/6934

On Wed, May 1, 2024, at 20:24, Greg Harris wrote:
> Hi Ivan,
> 
> Thank you for the KIP!
> I think PATCH using the same return type as PUT benefits the clients,
> even though the "created" field will always be false.
> 
> +1 (binding)
> 
> Thanks,
> Greg
> 
> On Fri, Apr 12, 2024 at 4:31 AM Yash Mayya  wrote:
> >
> > Hi Ivan,
> >
> > Thanks for reviving this KIP, I think it will be a useful addition to
> > Connect!
> >
> > +1 (binding)
> >
> > Cheers,
> > Yash
> >
> > On Tue, Apr 9, 2024 at 4:23 AM Knowles Atchison Jr 
> > wrote:
> >
> > > +1 (non binding)
> > >
> > > On Mon, Apr 8, 2024, 3:30 PM Chris Egerton 
> > > wrote:
> > >
> > > > Thanks Ivan! +1 (binding) from me.
> > > >
> > > > On Mon, Apr 8, 2024, 06:59 Ivan Yurchenko  wrote:
> > > >
> > > > > Hello!
> > > > >
> > > > > I'd like to put the subj KIP[1] to a vote. Thank you.
> > > > >
> > > > > Best regards,
> > > > > Ivan
> > > > >
> > > > > [1]
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API
> > > > >
> > > >
> > >
> 


[jira] [Created] (KAFKA-16657) KIP-848 does not work well on Zookeeper Mode

2024-05-02 Thread sanghyeok An (Jira)
sanghyeok An created KAFKA-16657:


 Summary: KIP-848 does not work well on Zookeeper Mode
 Key: KAFKA-16657
 URL: https://issues.apache.org/jira/browse/KAFKA-16657
 Project: Kafka
  Issue Type: Bug
Reporter: sanghyeok An


Hi, Kafka Team.

I am testing the new rebalance protocol of KIP-848. It seems that the KIP-848 
protocol works well in KRaft mode. However, KIP-848 protocol does not work well 
in `Zookeeper` mode. 

 

I have created two versions of docker-compose files for Zookeeper Mode and 
KRaft Mode. And I tested KIP-848 using the same consumer code and settings.

 

In KRaft Mode, the consumer received the assignment correctly. However, an 
error occurred in Zookeeper Mode.

 

*Is KIP-848 supported in Zookeeper mode? or only KRaft is supported?* 

 

FYI, This is the code I used.
 * ZK docker-compose: 
https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/docker-compose.yaml
 * ZK Result: 
https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose2/README.md
 * KRaft docker-compose:  
https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/docker-compose.yaml
 * KRaft Result: 
https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose3/README.md
 * Consumer code: 
https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


KAFKA-16221

2024-05-02 Thread matthias . kraaz
Hi,

Thanks for your work and sorry to bother you.

My code gets the same IllegalStateException from KafkaProducer as Kafka Stream 
gets in KAFKA-16221:

java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
because the previous call to `commitTransaction` timed out and must be retried
at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1109)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
at 
org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)

I have followed the recipe from 
https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
for error handling for a transactional producer, i.e.

 try {
 producer.beginTransaction();
 for (int i = 0; i < 100; i++)
 producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), 
Integer.toString(i)));
 producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e) {
 // We can't recover from these exceptions, so our only option is to close 
the producer and exit.
 producer.close();
 } catch (KafkaException e) {
 // For all other exceptions, just abort the transaction and try again.
 producer.abortTransaction();
 }
 producer.close();

Kafka Streams has solved KAFKA-16221 by introducing a hack 
(https://github.com/apache/kafka/pull/15315), but plans a clean solution.

Does that mean that above recipe is outdated?
Is there really no simple, clean solution how to do the error handling?
Should I use the solution from https://github.com/apache/kafka/pull/15315 and 
wait for what Kafka Streams comes up next for the clean solution?

Kind regards, Matthias Kraaz



[jira] [Resolved] (KAFKA-10804) Tune travis system tests to avoid timeouts

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10804.

Resolution: Won't Fix

> Tune travis system tests to avoid timeouts
> --
>
> Key: KAFKA-10804
> URL: https://issues.apache.org/jira/browse/KAFKA-10804
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Thanks to https://github.com/apache/kafka/pull/9652, we are now running 
> system tests for PRs. However, it looks like we need some tuning because many 
> of the subsets are timing out. For example: 
> https://travis-ci.com/github/apache/kafka/jobs/453241933. This might just be 
> a matter of adding more subsets or changing the timeout, but we should 
> probably also consider whether we want to run all system tests or if there is 
> a more useful subset of them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10809) Make all system tests stably run on either Travis CI or local

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10809.

Resolution: Fixed

> Make all system tests stably run on either Travis CI or local
> -
>
> Key: KAFKA-10809
> URL: https://issues.apache.org/jira/browse/KAFKA-10809
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> It can encourage us to write more system tests if Travis CI can run a subset 
> of system tests automatically.
> This initial work is https://github.com/apache/kafka/pull/9652



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-02 Thread Lenin Joseph (Jira)
Lenin Joseph created KAFKA-16656:


 Summary: Using a custom replication.policy.separator with 
DefaultReplicationPolicy
 Key: KAFKA-16656
 URL: https://issues.apache.org/jira/browse/KAFKA-16656
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.5.1
Reporter: Lenin Joseph


Hi,

In the case of bidirectional replication using mm2, when we tried using a 
custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
we see cyclic replication of topics. Could you confirm whether it's mandatory 
to use a CustomReplicationPolicy whenever we want to use a separator other than 
a "." ?

Regards, 
Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10290) fix flaky core/compatibility_test_new_broker_test.py

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10290.

Resolution: Won't Fix

> fix flaky core/compatibility_test_new_broker_test.py
> 
>
> Key: KAFKA-10290
> URL: https://issues.apache.org/jira/browse/KAFKA-10290
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, system tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.core.compatibility_test_new_broker_test
> Class:  ClientCompatibilityTestNewBroker
> Method: test_compatibility
> Arguments:
> {
>   "compression_types": [
> "none"
>   ],
>   "consumer_version": "1.0.2",
>   "producer_version": "1.0.2",
>   "timestamp_type": "CreateTime"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14874) Unable to create > 5000 topics for once when using Kraft

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-14874.

Resolution: Won't Fix

> Unable to create > 5000 topics for once when using Kraft
> 
>
> Key: KAFKA-14874
> URL: https://issues.apache.org/jira/browse/KAFKA-14874
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> the error happens due to 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L779]
> I encountered this error when creating >5000 topics for mirroring the cluster 
> from zk to Kraft. The operation of creating a bunch of topics is allowed by 
> zk-based kafka.
> It seems to me there are two improvements for this issue.
> 1) add more precise error message for such case.
> 2) make `maxRecordsPerBatch` configurable (there is already a setter 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java#L272])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10805) More useful reporting from travis system tests

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10805.

Resolution: Won't Fix

> More useful reporting from travis system tests
> --
>
> Key: KAFKA-10805
> URL: https://issues.apache.org/jira/browse/KAFKA-10805
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> Inspecting system test output from travis is a bit painful at the moment 
> because you have to check the build logs to find the tests that failed. 
> Additionally, there is no logging available from the workers which is often 
> essential to debug a failure. We should look into how we can improve the 
> build so that the output is more convenient and useful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10288) fix flaky client/client_compatibility_features_test.py

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10288.

Resolution: Won't Fix

> fix flaky client/client_compatibility_features_test.py
> --
>
> Key: KAFKA-10288
> URL: https://issues.apache.org/jira/browse/KAFKA-10288
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, system tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.client.client_compatibility_features_test
> Class:  ClientCompatibilityFeaturesTest
> Method: run_compatibility_test
> Arguments:
> {
>   "broker_version": "0.10.0.1"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10291) fix flaky tools/log4j_appender_test.py

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10291.

Resolution: Won't Fix

> fix flaky tools/log4j_appender_test.py
> --
>
> Key: KAFKA-10291
> URL: https://issues.apache.org/jira/browse/KAFKA-10291
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests, tools
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.tools.log4j_appender_test
> Class:  Log4jAppenderTest
> Method: test_log4j_appender
> Arguments:
> {
>   "security_protocol": "SSL"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10822) Force some stdout from system tests for Travis

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10822.

Resolution: Won't Fix

> Force some stdout from system tests for Travis
> --
>
> Key: KAFKA-10822
> URL: https://issues.apache.org/jira/browse/KAFKA-10822
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> downgrade_test.py/upgrade_test.py does upgrade/downgrade for each tests. the 
> upgrade/downgrade tasks take 10+ mins in Travis env so we ought to print 
> something in order to avoid timeout caused by Travis.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10696) Replace ProduceResponse.PartitionResponse by auto-generated PartitionProduceResponse

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10696.

Resolution: Won't Fix

> Replace ProduceResponse.PartitionResponse by auto-generated 
> PartitionProduceResponse
> 
>
> Key: KAFKA-10696
> URL: https://issues.apache.org/jira/browse/KAFKA-10696
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> This is a follow-up of KAFKA-9628.
> related discussion: 
> https://github.com/apache/kafka/pull/9401#discussion_r518976605



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7014) Guarantee the byte buffer returned by Serializer is reusable

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-7014.
---
Resolution: Won't Fix

> Guarantee the byte buffer returned by Serializer is reusable
> 
>
> Key: KAFKA-7014
> URL: https://issues.apache.org/jira/browse/KAFKA-7014
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Minor
>
> The byte array returned by Serializer is copied to another ByteBuffer by 
> KafkaProducer so the byte array is reusable actually. If kafka can guarantee 
> this behavior (perhaps doc it on the Serializer), user can design the 
> size-fixed message and then reuse the byte array.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-6992) Configuring the batch.size to zero won't disable the batching entirely

2024-05-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-6992.
---
Resolution: Won't Fix

> Configuring the batch.size to zero won't disable the batching entirely
> --
>
> Key: KAFKA-6992
> URL: https://issues.apache.org/jira/browse/KAFKA-6992
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Chia-Ping Tsai
>Priority: Minor
>
> In MAGIC_VALUE_V2, the varint is used and the estimated size won't be equal 
> with the actual size. So it has chance to accept more records in a single 
> produce batch.
> Perhaps we should revise the docs of batch.size.
> {code:java}
> public static final String BATCH_SIZE_CONFIG = "batch.size";
> private static final String BATCH_SIZE_DOC = "The producer will attempt to 
> batch records together into fewer requests whenever multiple records are 
> being sent"
>  + " to the same partition. This 
> helps performance on both the client and the server. This configuration 
> controls the "
>  + "default batch size in bytes. "
>  + ""
>  + "No attempt will be made to 
> batch records larger than this size. "
>  + ""
>  + "Requests sent to brokers will 
> contain multiple batches, one for each partition with data available to be 
> sent. "
>  + ""
>  + "A small batch size will make 
> batching less common and may reduce throughput (a batch size of zero will 
> disable "
>  + "batching entirely). A very 
> large batch size may use memory a bit more wastefully as we will always 
> allocate a "
>  + "buffer of the specified batch 
> size in anticipation of additional records.";{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-02 Thread Claude Warren, Jr
There is some question about whether or not we need the configuration
options.  My take on them is as follows:

producer.id.quota.window.num  No opinion.  I don't know what this is used
for, but I suspect that there is a good reason to have it.  It is not used
within the Bloom filter caching mechanism
producer.id.quota.window.size.seconds Leave it as it is one of the most
effective ways to tune the filter and determines how long a PID is
recognized.
producer.id.quota.cache.cleanup.scheduler.interval.ms  Remove it unless
there is another use for it.   We can get a better calculation for
internals.
producer.id.quota.cache.layer.count Leave it as it is one of the most
effective ways to tune the filter.
producer.id.quota.cache.false.positive.rate Replace it with a constant,  I
don't think any other Bloom filter solution provides access to this knob
for end users.
producer_ids_rate Leave this one, it is critical for reasonable operation.


Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-02 Thread Claude Warren, Jr
Quick note:  I renamed the example code.  It is now at
https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManagerCache.java

On Thu, May 2, 2024 at 10:47 AM Claude Warren, Jr 
wrote:

> Igor,  thanks for taking the time to look and to review the code.  I
> regret that I have not pushed the latest code, but I will do so and will
> see what I can do about answering your Bloom filter related questions here.
>
>  How would an operator know or decide to change the configuration
>> for the number layers – producer.id.quota.cache.layer.count –
>> e.g. increasing from 4 to 5; and why?
>> Do we need a new metric to indicate that change could be useful?
>
>
> In our usage the layered Bloom filter [6] retains the record of a PID for
> producer.id.quota.window.size.seconds.  It breaks that window down into
> multiple fragments, so 4 layers = 15 minute fragments.  It "forgets" a
> fragment worth of data when the fragment has been around for
> window.size.seconds.  The layers will determine how big a chunk of time is
> deleted at once.  Changing the layers to 10 will yield 6 minute fragments,
> 60 will yield 1 minute fragments and so on.  There are other idiosyncrasies
> that I will get into later.  I would not set the value lower than 3.  If
> you find that there are multiple reports of new PIDs because on average
> they only ping every 50 minutes it might make sense to use more layers.  If
> you use too many layers then there will only be one PID in each layer, and
> at that point a simple list of Filters would be faster to search, but in
> reality does not make sense.  If you have two layers then recurring PIDs
> will be recorded in both layers.
>
>  Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
>> guaranteed interval, or rather simply a delay between cleanups?
>> How did you decide on the default value of 10ms?
>
>
> In the code this is not used.  Cleanups are amortized across inserts to
> keep the layers balanced.  There is a thread that does a cleanup every
>  producer.id.quota.window.size.seconds /
> producer.id.quota.cache.layer.count seconds to detect principals that are
> no longer sending data.  This is a reasonable frequency as it will align
> well with when the layers actually expire.
>
>  Under "New ProducerIdQuotaManagerCache", the documentation for
>> the constructor params for ProducerIDQuotaManagerCache does not
>> match the constructor signature.
>
>
> Sorry, this is because I did not push the changes.  The constructor is
> ProducerIDQuotaManagerCache(Double falsePositiveRate, long ttl, int
> layerCount).  Where falsePositiveRate is the Bloom filter false positive
> rate, ttl is producer.id.quota.window.size.seconds in milliseconds, and the
> layerCount is the desired number of layers.
>
> Under "New ProducerIdQuotaManagerCache":
>>   public boolean track(KafkaPrincipal principal, int producerIdRate, long
>> pid)
>> How is producerIdRate used? The reference implementation Claude shared
>> does not use it.
>>
>> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>
>
> Again, sorry for not updating the code.  The producer rate is used to
> build a Bloom filter of the proper size.  The producer rate is the number
> of PIDs per hour expected to be created by the principal.  The Bloom filter
> shape [1] is determined by the expected number of PIDs per layer
> (producerRate * seconds_per_hour / producer.id.quota.window.size.seconds /
> producer.id.quota.cache.layer.count) and the falsePositiveRate from the
> constructor.  These values are used to call the Shape.fromNP() method.
> This is the Shape of the Bloom filters in the layer.  It is only used when
> the principal is not found in the cache.  Thomas Hurst has provided a web
> page [5] where you can explore the interaction between number of items and
> false positive rate.
>
> I could not find a description or definition for
>> TimestampedBloomFilter, could we add that to the KIP?
>
>
> I will add it.  It is simply an implementation of WrappedBloomFilter [2]
> that adds the timestamp for when the filter was created.
>
> LayeredBloomFilter will have a fixed size (right?), but some
>> users (KafkaPrincipal) might only use a small number of PIDs.
>> It it worth having a dual strategy, where we simply keep a Set of
>> PIDs until we reach certain size where it pays off to use
>> the LayeredBloomFilter?
>
>
> Each principal has its own Layered Bloom filter.
>
> Here come the idiosyncrasies and benefits of the layered Bloom filter.
> The layered Bloom filter can be thought of as a collection of bloom filters
> that are queried to see if the item being searched for (target) has been
> seen.  There are a bunch of ways that the layered filter could be used.
> You could have a layer for each storage location in a multiple location
> storage engine for example.  But 

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-02 Thread Claude Warren, Jr
Igor,  thanks for taking the time to look and to review the code.  I regret
that I have not pushed the latest code, but I will do so and will see what
I can do about answering your Bloom filter related questions here.

 How would an operator know or decide to change the configuration
> for the number layers – producer.id.quota.cache.layer.count –
> e.g. increasing from 4 to 5; and why?
> Do we need a new metric to indicate that change could be useful?


In our usage the layered Bloom filter [6] retains the record of a PID for
producer.id.quota.window.size.seconds.  It breaks that window down into
multiple fragments, so 4 layers = 15 minute fragments.  It "forgets" a
fragment worth of data when the fragment has been around for
window.size.seconds.  The layers will determine how big a chunk of time is
deleted at once.  Changing the layers to 10 will yield 6 minute fragments,
60 will yield 1 minute fragments and so on.  There are other idiosyncrasies
that I will get into later.  I would not set the value lower than 3.  If
you find that there are multiple reports of new PIDs because on average
they only ping every 50 minutes it might make sense to use more layers.  If
you use too many layers then there will only be one PID in each layer, and
at that point a simple list of Filters would be faster to search, but in
reality does not make sense.  If you have two layers then recurring PIDs
will be recorded in both layers.

 Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
> guaranteed interval, or rather simply a delay between cleanups?
> How did you decide on the default value of 10ms?


In the code this is not used.  Cleanups are amortized across inserts to
keep the layers balanced.  There is a thread that does a cleanup every
 producer.id.quota.window.size.seconds /
producer.id.quota.cache.layer.count seconds to detect principals that are
no longer sending data.  This is a reasonable frequency as it will align
well with when the layers actually expire.

 Under "New ProducerIdQuotaManagerCache", the documentation for
> the constructor params for ProducerIDQuotaManagerCache does not
> match the constructor signature.


Sorry, this is because I did not push the changes.  The constructor is
ProducerIDQuotaManagerCache(Double falsePositiveRate, long ttl, int
layerCount).  Where falsePositiveRate is the Bloom filter false positive
rate, ttl is producer.id.quota.window.size.seconds in milliseconds, and the
layerCount is the desired number of layers.

Under "New ProducerIdQuotaManagerCache":
>   public boolean track(KafkaPrincipal principal, int producerIdRate, long
> pid)
> How is producerIdRate used? The reference implementation Claude shared
> does not use it.
>
> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java


Again, sorry for not updating the code.  The producer rate is used to build
a Bloom filter of the proper size.  The producer rate is the number of PIDs
per hour expected to be created by the principal.  The Bloom filter shape
[1] is determined by the expected number of PIDs per layer (producerRate *
seconds_per_hour / producer.id.quota.window.size.seconds /
producer.id.quota.cache.layer.count) and the falsePositiveRate from the
constructor.  These values are used to call the Shape.fromNP() method.
This is the Shape of the Bloom filters in the layer.  It is only used when
the principal is not found in the cache.  Thomas Hurst has provided a web
page [5] where you can explore the interaction between number of items and
false positive rate.

I could not find a description or definition for
> TimestampedBloomFilter, could we add that to the KIP?


I will add it.  It is simply an implementation of WrappedBloomFilter [2]
that adds the timestamp for when the filter was created.

LayeredBloomFilter will have a fixed size (right?), but some
> users (KafkaPrincipal) might only use a small number of PIDs.
> It it worth having a dual strategy, where we simply keep a Set of
> PIDs until we reach certain size where it pays off to use
> the LayeredBloomFilter?


Each principal has its own Layered Bloom filter.

Here come the idiosyncrasies and benefits of the layered Bloom filter.  The
layered Bloom filter can be thought of as a collection of bloom filters
that are queried to see if the item being searched for (target) has been
seen.  There are a bunch of ways that the layered filter could be used.
You could have a layer for each storage location in a multiple location
storage engine for example.  But in our case the layer signifies a starting
time fragment.  That fragment will be at most
producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count
seconds long.  The earliest layers are at the lower indices, the latest one
at the highest.

In general, when an item is added to the Layered Bloom filter the following
processes take place:

   - old layers filters are removed using 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2864

2024-05-02 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 351225 lines...]
[2024-05-02T07:37:14.138Z] > Task 
:connect:json:publishMavenJavaPublicationToMavenLocal
[2024-05-02T07:37:14.138Z] > Task :connect:json:publishToMavenLocal
[2024-05-02T07:37:14.138Z] > Task :server:compileTestJava
[2024-05-02T07:37:14.138Z] > Task :server:testClasses
[2024-05-02T07:37:15.241Z] > Task :server-common:compileTestJava
[2024-05-02T07:37:15.241Z] > Task :server-common:testClasses
[2024-05-02T07:37:17.328Z] > Task :raft:compileTestJava
[2024-05-02T07:37:17.328Z] > Task :raft:testClasses
[2024-05-02T07:37:20.456Z] 
[2024-05-02T07:37:20.456Z] > Task :clients:javadoc
[2024-05-02T07:37:20.456Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java:32:
 warning - Tag @see: missing final '>': "https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API;>KIP-554:
 Add Broker-side SCRAM Config API
[2024-05-02T07:37:20.456Z] 
[2024-05-02T07:37:20.456Z]  This code is duplicated in 
org.apache.kafka.common.security.scram.internals.ScramMechanism.
[2024-05-02T07:37:20.456Z]  The type field in both files must match and must 
not change. The type field
[2024-05-02T07:37:20.456Z]  is used both for passing ScramCredentialUpsertion 
and for the internal
[2024-05-02T07:37:20.456Z]  UserScramCredentialRecord. Do not change the type 
field."
[2024-05-02T07:37:20.456Z] 
[2024-05-02T07:37:20.456Z] > Task :core:compileScala
[2024-05-02T07:37:21.558Z] > Task :group-coordinator:compileTestJava
[2024-05-02T07:37:21.558Z] > Task :group-coordinator:testClasses
[2024-05-02T07:37:22.829Z] > Task 
:streams:generateMetadataFileForMavenJavaPublication
[2024-05-02T07:37:22.829Z] 
[2024-05-02T07:37:22.829Z] > Task :clients:javadoc
[2024-05-02T07:37:22.829Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/package-info.java:21:
 warning - Tag @link: reference not found: 
org.apache.kafka.common.security.oauthbearer
[2024-05-02T07:37:22.829Z] 2 warnings
[2024-05-02T07:37:22.829Z] 
[2024-05-02T07:37:22.829Z] > Task :clients:javadocJar
[2024-05-02T07:37:23.930Z] > Task :clients:srcJar
[2024-05-02T07:37:23.930Z] > Task :clients:testJar
[2024-05-02T07:37:23.930Z] > Task :metadata:compileTestJava
[2024-05-02T07:37:23.930Z] > Task :metadata:testClasses
[2024-05-02T07:37:25.030Z] > Task :clients:testSrcJar
[2024-05-02T07:37:25.030Z] > Task 
:clients:publishMavenJavaPublicationToMavenLocal
[2024-05-02T07:37:25.030Z] > Task :clients:publishToMavenLocal
[2024-05-02T07:37:25.030Z] > Task 
:connect:api:generateMetadataFileForMavenJavaPublication
[2024-05-02T07:37:25.030Z] > Task :connect:api:compileTestJava UP-TO-DATE
[2024-05-02T07:37:25.030Z] > Task :connect:api:testClasses UP-TO-DATE
[2024-05-02T07:37:25.030Z] > Task :connect:api:testJar
[2024-05-02T07:37:25.030Z] > Task :connect:api:testSrcJar
[2024-05-02T07:37:25.030Z] > Task 
:connect:api:publishMavenJavaPublicationToMavenLocal
[2024-05-02T07:37:25.030Z] > Task :connect:api:publishToMavenLocal
[2024-05-02T07:37:30.523Z] > Task :streams:javadoc
[2024-05-02T07:37:31.637Z] > Task :streams:javadocJar
[2024-05-02T07:37:31.637Z] > Task :streams:srcJar
[2024-05-02T07:37:31.637Z] > Task :streams:processTestResources UP-TO-DATE
[2024-05-02T07:38:16.430Z] > Task :core:classes
[2024-05-02T07:38:16.430Z] > Task :core:compileTestJava NO-SOURCE
[2024-05-02T07:38:45.091Z] > Task :core:compileTestScala
[2024-05-02T07:39:46.661Z] > Task :core:testClasses
[2024-05-02T07:40:11.333Z] > Task :streams:compileTestJava
[2024-05-02T07:41:34.398Z] > Task :streams:testClasses
[2024-05-02T07:41:34.398Z] > Task :streams:testJar
[2024-05-02T07:41:34.398Z] > Task :streams:testSrcJar
[2024-05-02T07:41:34.398Z] > Task 
:streams:publishMavenJavaPublicationToMavenLocal
[2024-05-02T07:41:34.398Z] > Task :streams:publishToMavenLocal
[2024-05-02T07:41:34.398Z] 
[2024-05-02T07:41:34.398Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 9.0.
[2024-05-02T07:41:34.398Z] 
[2024-05-02T07:41:34.398Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2024-05-02T07:41:34.398Z] 
[2024-05-02T07:41:34.398Z] For more on this, please refer to 
https://docs.gradle.org/8.7/userguide/command_line_interface.html#sec:command_line_warnings
 in the Gradle documentation.
[2024-05-02T07:41:34.398Z] 
[2024-05-02T07:41:34.398Z] BUILD SUCCESSFUL in 5m
[2024-05-02T07:41:34.398Z] 96 actionable tasks: 41 executed, 55 up-to-date
[2024-05-02T07:41:34.398Z] 
[2024-05-02T07:41:34.398Z] Publishing build scan...
[2024-05-02T07:41:34.398Z] https://ge.apache.org/s/ggn6d3f7ju4pc
[2024-05-02T07:41:34.398Z] 
[Pipeline] sh
[2024-05-02T07:41:37.279Z] + + grep ^version=