Re: [VOTE] KIP-932: Queues for Kafka

2024-05-07 Thread Satish Duggana
Hi Andrew,
Thanks for the nice KIP, it will allow other messaging use cases to be
onboarded to Kafka.

+1 from me.

Satish.

On Tue, 7 May 2024 at 03:41, Jun Rao  wrote:
>
> Hi, Andrew,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Mon, Mar 18, 2024 at 11:00 AM Edoardo Comar 
> wrote:
>
> > Thanks Andrew,
> >
> > +1 (binding)
> >
> > Edo
> >
> > On Mon, 18 Mar 2024 at 16:32, Kenneth Eversole
> >  wrote:
> > >
> > > Hi Andrew
> > >
> > > + 1 (Non-Binding)
> > >
> > > This will be great addition to Kafka
> > >
> > > On Mon, Mar 18, 2024 at 8:27 AM Apoorv Mittal 
> > > wrote:
> > >
> > > > Hi Andrew,
> > > > Thanks for writing the KIP. This is indeed going to be a valuable
> > addition
> > > > to the Kafka, excited to see the KIP.
> > > >
> > > > + 1 (Non-Binding)
> > > >
> > > > Regards,
> > > > Apoorv Mittal
> > > > +44 7721681581
> > > >
> > > >
> > > > On Sun, Mar 17, 2024 at 11:16 PM Andrew Schofield <
> > > > andrew_schofield_j...@outlook.com> wrote:
> > > >
> > > > > Hi,
> > > > > I’ve been working to complete KIP-932 over the past few months and
> > > > > discussions have quietened down.
> > > > >
> > > > > I’d like to open the voting for KIP-932:
> > > > >
> > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> > > > >
> > > > > Thanks,
> > > > > Andrew
> > > >
> >


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Matthias J. Sax

Very interesting discussion.

Seems a central point is the question about "how generic" we approach 
this, and some people think we need to be conservative and others think 
we should try to be as generic as possible.


Personally, I think following a limited scope for this KIP (by 
explicitly saying we only cover RecordTooLarge and 
UnknownTopicOrPartition) might be better. We have concrete tickets that 
we address, while for other exception (like authorization) we don't know 
if people want to handle it to begin with. Boiling the ocean might not 
get us too far, and being somewhat pragmatic might help to move this KIP 
forward. -- I also agree with Justin and Artem, that we want to be 
careful anyway to not allow users to break stuff too easily.


As the same time, I agree that we should setup this change in a forward 
looking way, and thus having a single generic handler allows us to later 
extend the handler more easily. This should also simplify follow up KIP 
that might add new error cases (I actually mentioned one more to Alieh 
already, but we both agreed that it might be best to exclude it from the 
KIP right now, to make the 3.8 deadline. Doing a follow up KIP is not 
the end of the world.)




@Chris:

(2) This sounds fair to me, but not sure how "bad" it actually would be? 
If the contract is clearly defined, it seems to be fine what the KIP 
proposes, and given that such a handler is an expert API, and we can 
provide "best practices" (cf my other comment below in [AL1]), being a 
little bit pragmatic sound fine to me.


Not sure if I understand Justin's argument on this question?


(3) About having a default impl or not. I am fine with adding one, even 
if I am not sure why Connect could not just add its own one and plug it 
in (and we would add corresponding configs for Connect, but not for the 
producer itself)? For this case, we could also do this as a follow up 
KIP, but happy to include it in this KIP to provide value to Connect 
right away (even if the value might not come right away if we miss the 
3.8 deadline due to expanded KIP scope...) --  For KS, we would for sure 
plugin our own impl, and lock down the config such that users cannot set 
their own handler on the internal producer to begin with. Might be good 
to elaborate why the producer should have a default? We might actually 
want to add this to the KIP right away?


The key for a default impl would be, to not change the current behavior, 
and having no default seems to achieve this. For the two cases you 
mentioned, it's unclear to me what default value on "upper bound on 
retires" for UnkownTopicOrPartitionException we should set? Seems it 
would need to be the same as `delivery.timeout.ms`? However, if users 
have `delivery.timeout.ms` actually overwritten we would need to set 
this config somewhat "dynamic"? Is this feasible? If we hard-code 2 
minutes, it might not be backward compatible. I have the impression we 
might introduce some undesired coupling? -- For the "record too large" 
case, the config seems to be boolean and setting it to `false` by 
default seems to provide backward compatibility.




@Artem:

[AL1] While I see the point, I would think having a different callback 
for every exception might not really be elegant? In the end, the handler 
is an very advanced feature anyway, and if it's implemented in a bad 
way, well, it's a user error -- we cannot protect users from everything. 
To me, a handler like this, is to some extend "business logic" and if a 
user gets business logic wrong, it's hard to protect them. -- We would 
of course provide best practice guidance in the JaveDocs, and explain 
that a handler should have explicit `if` statements for stuff it want to 
handle, and only a single default which return FAIL.



[AL2] Yes, but for KS we would retry at the application layer. Ie, the 
TX is not completed, we clean up and setup out task from scratch, to 
ensure the pending transaction is completed before we resume. If the TX 
was indeed aborted, we would retry from older offset and thus just hit 
the same error again and the loop begins again.



[AL2 cont.] Similar to AL1, I see such a handler to some extend as 
business logic. If a user puts a bad filter condition in their KS app, 
and drops messages, it nothing we can do about it, and this handler 
IMHO, has a similar purpose. This is also the line of thinking I apply 
to EOS, to address Justin's concern about "should we allow to drop for 
EOS", and my answer is "yes", because it's more business logic than 
actual error handling IMHO. And by default, we fail... So users opt-in 
to add business logic to drop records. It's an application level 
decision how to write the code.



[AL3] Maybe I misunderstand what you are saying, but to me, checking the 
size of the record upfront is exactly what the KIP proposes? No?




@Justin:


I saw the sample
code -- is it just an if statement checking for the error before the
handler is invoked? That seems a bi

[jira] [Resolved] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16678.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Remove unimplementedquorum from EndToEndAuthorizationTest
> -
>
> Key: KAFKA-16678
> URL: https://issues.apache.org/jira/browse/KAFKA-16678
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.8.0
>
>
> `unimplementedquorum`[0] is used to skip test cases if they don't support to 
> run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related 
> tests support to run by kraft.
> In short, it is time to remove the unused variable :)
> [0] 
> [https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-07 Thread Matthias J. Sax

@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else.


What makes a serialization exception special compare to other errors 
that it's valuable to treat it differently? Why can we put "everything 
else" into a single bucket? By your train of though, should we not split 
out the "everything else" bucket into a different callback method for 
every different error? If no, why not, but only for serialization errors?


From what I believe to remember, historically, we added the 
ProductionExceptionHandler, and kinda just missed the serialization 
error case. And later, when we extended the handler we just could not 
re-use the existing callback as it was typed with `` and 
it would have been an incompatible change; so it was rather a workaround 
to add the second method to then handler, but not really intended design?



It's of course only my personal opinion that I believe a single callback 
method is simpler/cleaner compared to sticking with two, and adding the 
new exception type to make it backward compatible seems worth it. It 
also kinda introduces the same patter we use elsewhere (cf KIP-1036) 
what I actually think is an argument for introducing 
`RercordSerializationExcetpion`, to unify user experience across the board.


Would be great to hear from others about this point. It's not that I 
strongly object to having two methods, and I would not block this KIP on 
this question.




-Matthias


On 5/7/24 3:40 PM, Sophie Blee-Goldman wrote:

First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else. I also think we can take advantage of this fact
to simplify things a bit and cut down on the amount of new stuff added to
the API by just adding a parameter to the #handleSerializationException
callback and use that to pass in the SerializationExceptionOrigin enum to
distinguish between key vs value. This way we wouldn't need to introduce
yet another exception type (the RecordSerializationException) just to pass
in this information.

Thoughts?

On Tue, May 7, 2024 at 8:33 AM Loic Greffier 
wrote:


Hi Matthias,

To sum up with the ProductionExceptionHandler callback methods (106)
proposed changes.

A new method ProductionExceptionHandler#handle is added with the following
signature:


ProductionExceptionHandlerResponse handle(final ErrorHandlerContext

context, final ProducerRecord record, final Exception exception);

The ProducerRecord parameter has changed to accept a serialized or
non-serialized record.
Thus, the new ProductionExceptionHandler#handle method can handle both
production exception and serialization exception.

Both old ProductionExceptionHandler#handle and
ProductionExceptionHandler#handleSerializationException methods are now
deprecated.
The old ProductionExceptionHandler#handle method gets a default
implementation, so users do not have to implement a deprecated method.

To handle backward compatibility, the new
ProductionExceptionHandler#handle method gets a default implementation.


default ProductionExceptionHandlerResponse handle(final

ErrorHandlerContext context, final ProducerRecord record, final
Exception exception) {

   if (exception instanceof RecordSerializationException) {
   this.handleSerializationException(record, exception.getCause());
   }

   return handle((ProducerRecord) record, exception);
}


The default implementation either invokes #handleSerializationException or
#handle depending on the type of the exception, thus users still relying on
deprecated ProductionExceptionHandler#handle
or ProductionExceptionHandler#handleSerializationException custom
implementations won't break.

The new ProductionExceptionHandler#handle method is now invoked in case of
serialization exception:


public  void send(final String topic, final K key, final V value,

...) {

 try {
 keyBytes = keySerializer.serialize(topic, headers, key);
 ...
 } catch (final ClassCastException exception) {
   ...
 } catch (final Exception exception) {

 try {
 response = productionExceptionHandler.handle(context,

record, new RecordSerializationException(SerializationExceptionOrigin.KEY,
exception));

 } catch (final Exception e) {
 ...
 }
 }
}


To wrap the origin serialization exception and determine whether it comes
from the key or the value, a new exception class is created:


public class RecordSerializationException extends SerializationException

{

 public enum SerializationExceptionOrigin {
 KEY,
 VALUE
 }

 public RecordSerializatio

[jira] [Created] (KAFKA-16691) Support for nested structures: TimestampConverter

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-16691:


 Summary: Support for nested structures: TimestampConverter
 Key: KAFKA-16691
 URL: https://issues.apache.org/jira/browse/KAFKA-16691
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jorge Esteban Quilcate Otoya






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16690) Support for nested structures: HeaderFrom

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-16690:


 Summary: Support for nested structures: HeaderFrom
 Key: KAFKA-16690
 URL: https://issues.apache.org/jira/browse/KAFKA-16690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jorge Esteban Quilcate Otoya






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14226) Introduce support for nested structures

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya resolved KAFKA-14226.
--
Resolution: Fixed

Merged: https://github.com/apache/kafka/pull/15379

> Introduce support for nested structures
> ---
>
> Key: KAFKA-14226
> URL: https://issues.apache.org/jira/browse/KAFKA-14226
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Abstraction for FieldPath and initial SMTs:
>  * ExtractField
>  * HeaderFrom
>  * TimestampConverter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16689) Move LogValidatorTest to storage module

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16689:
--

 Summary: Move LogValidatorTest to storage module
 Key: KAFKA-16689
 URL: https://issues.apache.org/jira/browse/KAFKA-16689
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


`LogValidator` is moved to storage module already but its unit test is still in 
core module. That is a bit weird. We ought to rewrite it by java and then move 
it to storage module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Artem Livshits
Hi Alieh,

Thanks for the KIP.  The motivation talks about very specific cases, but
the interface is generic.

[AL1]
If the interface evolves in the future I think we could have the following
confusion:

1. A user implemented SWALLOW action for both RecordTooLargeException and
UnknownTopicOrPartitionException.  For simpicity they just return SWALLOW
from the function, because it elegantly handles all known cases.
2. The interface has evolved to support a new exception.
3. The user has upgraded their Kafka client.

Now a new kind of error gets dropped on the floor without user's intention
and it would be super hard to detect and debug.

To avoid the confusion, I think we should use handlers for specific
exceptions.  Then if a new exception is added it won't get silently swalled
because the user would need to add new functionality to handle it.

I also have some higher level comments:

[AL2]
> it throws a TimeoutException, and the user can only blindly retry, which
may result in an infinite retry loop

If the TimeoutException happens during transactional processing (exactly
once is the desired sematnics), then the client should not retry when it
gets TimeoutException because without knowing the reason for
TimeoutExceptions, the client cannot know whether the message got actually
produced or not and retrying the message may result in duplicatees.

> The thrown TimeoutException "cuts" the connection to the underlying root
cause of missing metadata

Maybe we should fix the error handling and return the proper underlying
message?  Then the application can properly handle the message based on
preferences.

>From the product perspective, it's not clear how safe it is to blindly
ignore UnknownTopicOrPartitionException.  This could lead to situations
when a simple typo could lead to massive data loss (part of the data would
effectively be produced to a "black hole" and the user may not notice it
for a while).

In which situations would you recommend the user to "black hole" messages
in case of misconfiguration?

[AL3]

> If the custom handler decides on SWALLOW for RecordTooLargeException,

Is it my understanding that this KIP proposes that functionality that would
only be able to SWALLOW RecordTooLargeException that happen because the
producer cannot produce the record (if the broker rejects the batch, the
error won't get to the handler, because we cannot know which other records
get ignored).  In this case, why not just check the locally configured max
record size upfront and not produce the recrord in the first place?  Maybe
we can expose a validation function from the producer that could validate
the records locally, so we don't need to produce the record in order to
know that it's invalid.

-Artem

On Tue, May 7, 2024 at 2:07 PM Justine Olshan 
wrote:

> Alieh and Chris,
>
> Thanks for clarifying 1) but I saw the motivation. I guess I just didn't
> understand how that would be ensured on the producer side. I saw the sample
> code -- is it just an if statement checking for the error before the
> handler is invoked? That seems a bit fragile.
>
> Can you clarify what you mean by `since the code does not reach the KS
> interface and breaks somewhere in producer.` If we surfaced this error to
> the application in a better way would that also be a solution to the issue?
>
> Justine
>
> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi 
> wrote:
>
> > Hi,
> >
> >
> > Thank you, Chris and Justine, for the feedback.
> >
> >
> > @Chris
> >
> > 1) Flexibility: it has two meanings. The first meaning is the one you
> > mentioned. We are going to cover more exceptions in the future, but as
> > Justine mentioned, we must be very conservative about adding more
> > exceptions. Additionally, flexibility mainly means that the user is able
> to
> > develop their own code. As mentioned in the motivation section and the
> > examples, sometimes the user decides on dropping a record based on the
> > topic, for example.
> >
> >
> > 2) Defining two separate methods for retriable and non-retriable
> > exceptions: although the idea is brilliant, the user may still make a
> > mistake by implementing the wrong method and see a non-expecting
> behaviour.
> > For example, he may implement handleRetriable() for
> RecordTooLargeException
> > and define SWALLOW for the exception, but in practice, he sees no change
> in
> > default behaviour since he implemented the wrong method. I think we can
> > never reduce the user’s mistakes to 0.
> >
> >
> >
> > 3) Default implementation for Handler: the default behaviour is already
> > preserved with NO need of implementing any handler or setting the
> > corresponding config parameter `custom.exception.handler`. What you mean
> is
> > actually having a second default, which requires having both interface
> and
> > config parameters. About UnknownTopicOrPartitionException: the producer
> > already offers the config parameter `max.block.ms` which determines the
> > duration of retrying. The main purpose of the us

RE: DISCUSS KIP-984 Add pluggable compression interface to Kafka

2024-05-07 Thread Diop, Assane
Hi Greg, 

Thank you for your thoughtful response.

Our motivation here is to accelerate compression with the use of hardware 
accelerators. 

If the community prefers, we would be happy to contribute code to support 
compression accelerators, but we believe that introducing a pluggable 
compression framework   is more scalable than enabling new compression 
algorithms in an ad hoc manner.

A pluggable compression interface would enable hardware accelerators without 
requiring vendor-specific code in Kafka code base.

We aim to ensure robustness by supporting all possible language-clients. In 
this latest iteration, this design provides a path to support other languages 
where each client has its own topic holding the plugin information for that 
language.

The pluggable interface does not replace the built-in functionally, rather, it 
is an optional compression path seamlessly added for Kafka users who would like 
to use custom compression algorithms or simply accelerate current algorithms. 
In this latter case, a vendor providing acceleration for compression will need 
to support their plugins.

As far as your concerns, I appreciate you taking the time to respond. Let me 
address them the best I can: 
1) When an operator adds a plugin to a cluster, they must ensure that the 
compression algorithms for all the supported language-clients of that plugin 
are compatible . For the plugin to be installed, the language must support 
dynamic loading or linking of libraries and these mechanisms exist in at least 
Java, C, Go and Python. Clients written in a language that does not support 
dynamic loading or linking can still use built-in codecs and coexist in a 
cluster where plugins were registered. This coexistence highlights that the use 
of plugins is an optional feature. 

2) Plugins source should come from a reputable developer. This is true of any 
dependencies. Should an operator register a plugin, the plugin should have a 
path for support including deprecation of such plugin. If the community finds 
it useful, there could be an official Kafka repository and we are open to 
discussing ways to provide governance of the plugin ecosystem. 

3) We do not see this as a fork of the binary protocol, but rather an evolution 
of the protocol to provide additional flexibility for compression. 
Once a plugin is registered, it is compatible with all the “flavors” of the 
plugins which here means different minor versions of a codec. Compression 
algorithms typically follow semantic versioning where v1.x is compatible with 
v1.y and where v2.x is not necessarily compatible with v1.x. 
If a plugin version breaks compatibility with an older version, then it should 
be treated as a new plugin with a new plugin alias. 
In parallel to the plugin topic holding plugin information during registration, 
additional topics holding the plugin binaries can be published by the plugin 
admin tool during installation to ensure compatibility. We view this as 
improving performance at the cost of extra operator work.

4) We only require the operator to register and then install the plugins. 
During the registration  process,  the plugin admin tool  takes in a plugin 
information (plugin alias and classname/library) and then  internally assigns 
the pluginID.  The operator is only responsible for providing the plugin alias 
and the className/library. The plugin admin tool is a new Java class in Tools 
that interacts with the operator to setup the plugins in the cluster. At this 
stage of the KIP, we have assumed a manual installation of the plugin. 
Installing here means the deployment of the plugin binary making it ready to be 
dynamically loaded/linked when needed. 
We are looking at an option for dynamic installation of the plugin which would 
require the operator to install the binary using the plugin admin tool. Using 
the same concept as plugin registration, the operator can install the plugin 
binary by publishing it to a topic using the plugin admin tool. Clients that 
register a plugin by consuming the plugin list would also consume the necessary 
binaries from the cluster. 

5) When a plugin is used, the set of built-in codecs is augmented by the set of 
plugins described in the plugin topic. The additional set of codecs is 
cluster-dependent, so, while a given batch of records stays within a cluster, 
they remain self-contained. If these batches are produced into another cluster, 
then the operator needs to either recompress data using builtins/available 
plugins or install plugins in the dependent cluster. In this scenario a 
consumer would decompress the data, and, if the mirrored data needs the same 
compression plugin, then the operator is required to register and install the 
plugins in the secondary cluster.  
Our assertion is that the additional work required by an operator could be 
justified by improved performance.

6) There is a finite number of pluginID available based on the number of bits 
used in the attrib

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2881

2024-05-07 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16688) SystemTimer leaks resources on close

2024-05-07 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16688:
-

 Summary: SystemTimer leaks resources on close
 Key: KAFKA-16688
 URL: https://issues.apache.org/jira/browse/KAFKA-16688
 Project: Kafka
  Issue Type: Test
Affects Versions: 3.8.0
Reporter: Gaurav Narula
Assignee: Gaurav Narula


We observe some thread leaks with thread name {{executor-client-metrics}}.

This may happen because {{SystemTimer}} doesn't attempt to shutdown its 
executor service properly.

Refer: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15885/1/tests
 and tests with {{initializationError}} in them for stacktrace



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-07 Thread Rohan Desai
Thanks all! KIP-924 is accepted with 4 +1 (binding) votes from Sophia
Blee-Goldman, Matthias Sax, Bruno Cadonna, and Lucas Brutschy

On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai  wrote:

>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
>
> As this KIP has been open for a while, and gone through a couple rounds of
> review/revision, I'm calling a vote to get it approved.
>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-07 Thread Sophie Blee-Goldman
First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else. I also think we can take advantage of this fact
to simplify things a bit and cut down on the amount of new stuff added to
the API by just adding a parameter to the #handleSerializationException
callback and use that to pass in the SerializationExceptionOrigin enum to
distinguish between key vs value. This way we wouldn't need to introduce
yet another exception type (the RecordSerializationException) just to pass
in this information.

Thoughts?

On Tue, May 7, 2024 at 8:33 AM Loic Greffier 
wrote:

> Hi Matthias,
>
> To sum up with the ProductionExceptionHandler callback methods (106)
> proposed changes.
>
> A new method ProductionExceptionHandler#handle is added with the following
> signature:
>
> > ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
> context, final ProducerRecord record, final Exception exception);
>
> The ProducerRecord parameter has changed to accept a serialized or
> non-serialized record.
> Thus, the new ProductionExceptionHandler#handle method can handle both
> production exception and serialization exception.
>
> Both old ProductionExceptionHandler#handle and
> ProductionExceptionHandler#handleSerializationException methods are now
> deprecated.
> The old ProductionExceptionHandler#handle method gets a default
> implementation, so users do not have to implement a deprecated method.
>
> To handle backward compatibility, the new
> ProductionExceptionHandler#handle method gets a default implementation.
>
> > default ProductionExceptionHandlerResponse handle(final
> ErrorHandlerContext context, final ProducerRecord record, final
> Exception exception) {
> >   if (exception instanceof RecordSerializationException) {
> >   this.handleSerializationException(record, exception.getCause());
> >   }
> >
> >   return handle((ProducerRecord) record, exception);
> > }
>
> The default implementation either invokes #handleSerializationException or
> #handle depending on the type of the exception, thus users still relying on
> deprecated ProductionExceptionHandler#handle
> or ProductionExceptionHandler#handleSerializationException custom
> implementations won't break.
>
> The new ProductionExceptionHandler#handle method is now invoked in case of
> serialization exception:
>
> > public  void send(final String topic, final K key, final V value,
> ...) {
> > try {
> > keyBytes = keySerializer.serialize(topic, headers, key);
> > ...
> > } catch (final ClassCastException exception) {
> >   ...
> > } catch (final Exception exception) {
> >
> > try {
> > response = productionExceptionHandler.handle(context,
> record, new RecordSerializationException(SerializationExceptionOrigin.KEY,
> exception));
> > } catch (final Exception e) {
> > ...
> > }
> > }
> > }
>
> To wrap the origin serialization exception and determine whether it comes
> from the key or the value, a new exception class is created:
>
> > public class RecordSerializationException extends SerializationException
> {
> > public enum SerializationExceptionOrigin {
> > KEY,
> > VALUE
> > }
> >
> > public RecordSerializationException(SerializationExceptionOrigin
> origin, final Throwable cause);
> > }
>
> Hope it all makes sense,
> Loïc
>


Re: Request version not enabled errors upgrading from Kafka 3.5 -> 3.6

2024-05-07 Thread Justine Olshan
Hi Johnson,

Thanks for bringing this issue to the mailing list.

I'm familiar with the change you are referring to. However, during the
upgrade you should be hitting this code path and we should not sending
requests to older version brokers.
https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130
https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195

Even if we did, we shouldn't return network exception errors.

Do you mind opening a JIRA ticket with some more details so I can take a
closer look?

Justine

On Tue, May 7, 2024 at 11:38 AM Johnson Okorie  wrote:

> Hi folks,
>
> Awesome work you have been doing on this project!
>
> I was hoping I could get some help on an issue we are having in one of our
> Kafka clusters. Most of the clients on this cluster use
> exactly-once-semantics. The Kafka cluster currently runs version 3.5.2 and
> we were attempting an upgrade to 3.6.2. After replacing one of the brokers
> with the new version we saw a bunch of the following errors on the older
> brokers:
>
> ```
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not
> enabled
> ```
>
> This manifested as 'NETWORK_EXCEPTION' errors on the clients and downtime
> for those clients. On the new broker we saw:
>
> ```
> [AddPartitionsToTxnSenderThread-1063]: AddPartitionsToTxnRequest failed for
> node 1069 with a network exception.
> ```
>
> Digging through the changes in 3.6, we came across some changes introduced
> as part of KAFKA-14402 
> that
> we thought might lead to this behaviour and wanted to confirm.
>
> First we could see that  transaction.partition.verification.enable
> is enabled by default and enables a new code path that culminates in we
> sending version 4 ADD_PARTITIONS_TO_TXN requests to other brokers here
> <
> https://github.com/apache/kafka/blob/cb35ddc5ca233d5cca6f51c1c41b952a7e9fe1a0/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269
> >
> .
>
> However, we do not support  version 4 of ADD_PARTITIONS_TO_TXN requests as
> of Kafka 3.5.2? If these assumptions happen to be correct, does this mean
> that the upgrade to versions 3.6+ require
> transaction.partition.verification.enable
> to be set to false to allow upgrades?
>
> Regard,
> Johnson
>


Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception

2024-05-07 Thread Sophie Blee-Goldman
+1 (binding)

thanks for the KIP!

On Fri, May 3, 2024 at 9:13 AM Matthias J. Sax  wrote:

> +1 (binding)
>
> On 5/3/24 8:52 AM, Federico Valeri wrote:
> > Hi Fred, this is a useful addition.
> >
> > +1 non binding
> >
> > Thanks
> >
> > On Fri, May 3, 2024 at 4:11 PM Andrew Schofield
> >  wrote:
> >>
> >> Hi Fred,
> >> Thanks for the KIP. It’s turned out nice and elegant I think.
> Definitely a worthwhile improvement.
> >>
> >> +1 (non-binding)
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 30 Apr 2024, at 14:02, Frédérik Rouleau
>  wrote:
> >>>
> >>> Hi all,
> >>>
> >>> As there is no more activity for a while on the discuss thread, I
> think we
> >>> can start a vote.
> >>> The KIP is available on
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception
> >>>
> >>>
> >>> If you have some feedback or suggestions, please participate to the
> >>> discussion thread:
> >>> https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5
> >>>
> >>> Best regards,
> >>> Fred
> >>
>


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Justine Olshan
Alieh and Chris,

Thanks for clarifying 1) but I saw the motivation. I guess I just didn't
understand how that would be ensured on the producer side. I saw the sample
code -- is it just an if statement checking for the error before the
handler is invoked? That seems a bit fragile.

Can you clarify what you mean by `since the code does not reach the KS
interface and breaks somewhere in producer.` If we surfaced this error to
the application in a better way would that also be a solution to the issue?

Justine

On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi 
wrote:

> Hi,
>
>
> Thank you, Chris and Justine, for the feedback.
>
>
> @Chris
>
> 1) Flexibility: it has two meanings. The first meaning is the one you
> mentioned. We are going to cover more exceptions in the future, but as
> Justine mentioned, we must be very conservative about adding more
> exceptions. Additionally, flexibility mainly means that the user is able to
> develop their own code. As mentioned in the motivation section and the
> examples, sometimes the user decides on dropping a record based on the
> topic, for example.
>
>
> 2) Defining two separate methods for retriable and non-retriable
> exceptions: although the idea is brilliant, the user may still make a
> mistake by implementing the wrong method and see a non-expecting behaviour.
> For example, he may implement handleRetriable() for RecordTooLargeException
> and define SWALLOW for the exception, but in practice, he sees no change in
> default behaviour since he implemented the wrong method. I think we can
> never reduce the user’s mistakes to 0.
>
>
>
> 3) Default implementation for Handler: the default behaviour is already
> preserved with NO need of implementing any handler or setting the
> corresponding config parameter `custom.exception.handler`. What you mean is
> actually having a second default, which requires having both interface and
> config parameters. About UnknownTopicOrPartitionException: the producer
> already offers the config parameter `max.block.ms` which determines the
> duration of retrying. The main purpose of the user who needs the handler is
> to get the root cause of TimeoutException and handle it in the way he
> intends. The KIP explains the necessity of it for KS users.
>
>
> 4) Naming issue: By SWALLOW, we meant actually swallow the error, while
> SKIP means skip the record, I think. If it makes sense for more ppl, I can
> change it to SKIP
>
>
> @Justine
>
> 1) was addressed by Chris.
>
> 2 and 3) The problem is exactly what you mentioned. Currently, there is no
> way to handle these issues application-side. Even KS users who implement KS
> ProductionExceptionHandler are not able to handle the exceptions as they
> intend since the code does not reach the KS interface and breaks somewhere
> in producer.
>
> Cheers,
> Alieh
>
> On Tue, May 7, 2024 at 8:43 PM Chris Egerton 
> wrote:
>
> > Hi Justine,
> >
> > The method signatures for the interface are indeed open-ended, but the
> KIP
> > states that its uses will be limited. See the motivation section:
> >
> > > We believe that the user should be able to develop custom exception
> > handlers for managing producer exceptions. On the other hand, this will
> be
> > an expert-level API, and using that may result in strange behaviour in
> the
> > system, making it hard to find the root cause. Therefore, the custom
> > handler is currently limited to handling RecordTooLargeException and
> > UnknownTopicOrPartitionException.
> >
> > Cheers,
> >
> > Chris
> >
> >
> > On Tue, May 7, 2024, 14:37 Justine Olshan 
> > wrote:
> >
> > > Hi Alieh,
> > >
> > > I was out for KSB and then was also sick. :(
> > >
> > > To your point 1) Chris, I don't think it is limited to two specific
> > > scenarios, since the interface accepts a generic Exception e and can be
> > > implemented to check if that e is an instanceof any exception. I didn't
> > see
> > > anywhere that specific errors are enforced. I'm a bit concerned about
> > this
> > > actually. I'm concerned about the opened-endedness and the contract we
> > have
> > > with transactions. We are allowing the client to make decisions that
> are
> > > somewhat invisible to the server. As an aside, can we build in log
> > messages
> > > when the handler decides to skip etc a message. I'm really concerned
> > about
> > > messages being silently dropped.
> > >
> > > I do think Chris's point 2) about retriable vs non retriable errors is
> > > fair. I'm a bit concerned about skipping a unknown topic or partition
> > > exception too early, as there are cases where it can be transient.
> > >
> > > I'm still a little bit wary of allowing dropping records as part of EOS
> > > generally as in many cases, these errors signify an issue with the
> > original
> > > data. I understand that streams and connect/mirror maker may have
> reasons
> > > they want to progress past these messages, but wondering if there is a
> > way
> > > that can be done application-side. I'm willing to accept this sort o

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-07 Thread Justine Olshan
Hi Omnia,

Thanks for the detailed response.
I agree that the client ID solution can be tricky (and could even run into
the same problem if the client ID is not unique).

As for the waiting one day -- that was not meant to be an exact value, but
my point was that there will be some time where nothing changes as we wait
for the old IDs to expire. But I think I was misunderstanding how the rate
worked. (I was thinking more of a hard limit) I see we will restart the
rate counter on each window. I guess then the concern is to make sure that
given our window size and rate, we set a default that ensures we don't OOM.
👍

Justine


On Tue, May 7, 2024 at 1:25 PM Omnia Ibrahim 
wrote:

> Hi Justine Thanks for the feedback
>
> > So consider a case where there is a storm for a given principal. We could
> > have a large mass of short lived producers in addition to some
> > "well-behaved" ones. My understanding is that if the "well-behaved" one
> > doesn't produce as frequently ie less than once per hour, it will also
> get
> > throttled when a storm of short-lived producers leads the principal to
> hit
> > the given rate necessary for throttling. The idea of the filter is that
> we
> > don't throttle existing producers, but in this case, we will.
> I believe this would be part of the limitation of the KIP (which is
> similar to some extend with the rest of Kafka Quotas) If KafkaPrincipal of
> ClientId is shared between different use-cases where we have few well
> behaving and some misbehaving we will punish all as these are the
> identifiers we have about them.
>
> If I understand the scenario you described correctly I think I can break
> it to the following uses cases
> 1. KafkaPrincipal is shared between multiple applications with different
> ClientIds then It is not fair to throttle one application because the other
> one that have been configured with same KafkaPrincipal is misbehaving. This
> can be solved by breaking the throttling to be based on the combination of
> KafkaPrincipal-ClientId, it will increase the number of entries in the
> cache but will at least isolate applications. And I still believe it might
> be tricky for whoever is manning the cluster to list and throttle most
> client ids.
> 2. A new version of the application has been misconfigured and during
> rolling the upgrades only half the instances of this app has the
> misconfigured version and it keep creating short-lived PIDs while the other
> half has the well behaving but it produce on slower base so it will produce
> every two hours. This one will be impacted and it bit tricky to track these.
>
> In both cases I don’t believe we need to wait for the 24hr expiration of
> PID to hit however, I believe we need to follow one of these solutions
> 1. Break these uses cases to use different KafkaPrincipal (and or ClientId
> if we opt-in for this)
> 2. If both cases are using the same KafkaPrincipal then they are most
> likely the same owner of both apps and they will need to shutdown or fix
> the misbehaving/misconfigured application that create all these instances
> and we need to wait for the throttle time to pass before the well behaved
> client proceed with the unseen PID.
>
> > Note -- one thing that wasn't totally clear from the KIP was whether we
> > throttle all new produce requests from the client or just the ones with
> > unseen IDs. If we throttle them all, perhaps this point isn't a huge
> deal.
> I’ll clarify this. But the KIP is aiming to throttle all unseen PID for X
> amount of time which is the throttle time.
>
> > The other concern that I brought up is that when we throttle, we will
> > likely continue to throttle until the storm stops. This is because we
> will
> > have to wait 1 day or so for IDs to expire, and we will likely replace
> them
> > at a pretty fast rate. This can be acceptable if we believe that it is
> > helpful to getting the behavior to stop, but I just wanted to call out
> that
> > the user will likely not be able to start clients in the meantime.
> Am not sure the producer need to wait for 1 day (unless the PID quota is
> set too high) as we are throttling unseen PID per user any PID above the
> quota will not be registered at the leader side and we don’t store anything
> for idempotent at initialising so am not sure I see need to wait for 1 day
> unless am missing something.
> If User-A has `producer_ids_rate` 100 and the broker can store 2,000,000
> before hit out of memory. Then the leader will only store a 100 PIDs in 1
> hour and throttle any unseen PID as these will be considered new PIDs. If
> we ht the scenario that you described within  1 hr window then this client
> should be alerted that it didn’t produce and got throttled. Then we can
> apply one of the solutions I mentioned in first point. They either split
> the use cases to different KafkaPrincipals or shutdown the misconfigured
> app and wait the throttle time pass. The throttle doesn’t get controlled by
> the 24hr expiration of PID and we don’

Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Alieh Saeedi
Hi,


Thank you, Chris and Justine, for the feedback.


@Chris

1) Flexibility: it has two meanings. The first meaning is the one you
mentioned. We are going to cover more exceptions in the future, but as
Justine mentioned, we must be very conservative about adding more
exceptions. Additionally, flexibility mainly means that the user is able to
develop their own code. As mentioned in the motivation section and the
examples, sometimes the user decides on dropping a record based on the
topic, for example.


2) Defining two separate methods for retriable and non-retriable
exceptions: although the idea is brilliant, the user may still make a
mistake by implementing the wrong method and see a non-expecting behaviour.
For example, he may implement handleRetriable() for RecordTooLargeException
and define SWALLOW for the exception, but in practice, he sees no change in
default behaviour since he implemented the wrong method. I think we can
never reduce the user’s mistakes to 0.



3) Default implementation for Handler: the default behaviour is already
preserved with NO need of implementing any handler or setting the
corresponding config parameter `custom.exception.handler`. What you mean is
actually having a second default, which requires having both interface and
config parameters. About UnknownTopicOrPartitionException: the producer
already offers the config parameter `max.block.ms` which determines the
duration of retrying. The main purpose of the user who needs the handler is
to get the root cause of TimeoutException and handle it in the way he
intends. The KIP explains the necessity of it for KS users.


4) Naming issue: By SWALLOW, we meant actually swallow the error, while
SKIP means skip the record, I think. If it makes sense for more ppl, I can
change it to SKIP


@Justine

1) was addressed by Chris.

2 and 3) The problem is exactly what you mentioned. Currently, there is no
way to handle these issues application-side. Even KS users who implement KS
ProductionExceptionHandler are not able to handle the exceptions as they
intend since the code does not reach the KS interface and breaks somewhere
in producer.

Cheers,
Alieh

On Tue, May 7, 2024 at 8:43 PM Chris Egerton 
wrote:

> Hi Justine,
>
> The method signatures for the interface are indeed open-ended, but the KIP
> states that its uses will be limited. See the motivation section:
>
> > We believe that the user should be able to develop custom exception
> handlers for managing producer exceptions. On the other hand, this will be
> an expert-level API, and using that may result in strange behaviour in the
> system, making it hard to find the root cause. Therefore, the custom
> handler is currently limited to handling RecordTooLargeException and
> UnknownTopicOrPartitionException.
>
> Cheers,
>
> Chris
>
>
> On Tue, May 7, 2024, 14:37 Justine Olshan 
> wrote:
>
> > Hi Alieh,
> >
> > I was out for KSB and then was also sick. :(
> >
> > To your point 1) Chris, I don't think it is limited to two specific
> > scenarios, since the interface accepts a generic Exception e and can be
> > implemented to check if that e is an instanceof any exception. I didn't
> see
> > anywhere that specific errors are enforced. I'm a bit concerned about
> this
> > actually. I'm concerned about the opened-endedness and the contract we
> have
> > with transactions. We are allowing the client to make decisions that are
> > somewhat invisible to the server. As an aside, can we build in log
> messages
> > when the handler decides to skip etc a message. I'm really concerned
> about
> > messages being silently dropped.
> >
> > I do think Chris's point 2) about retriable vs non retriable errors is
> > fair. I'm a bit concerned about skipping a unknown topic or partition
> > exception too early, as there are cases where it can be transient.
> >
> > I'm still a little bit wary of allowing dropping records as part of EOS
> > generally as in many cases, these errors signify an issue with the
> original
> > data. I understand that streams and connect/mirror maker may have reasons
> > they want to progress past these messages, but wondering if there is a
> way
> > that can be done application-side. I'm willing to accept this sort of
> > proposal if we can make it clear that this sort of thing is happening and
> > we limit the blast radius for what we can do.
> >
> > Justine
> >
> > On Tue, May 7, 2024 at 9:55 AM Chris Egerton 
> > wrote:
> >
> > > Hi Alieh,
> > >
> > > Sorry for the delay, I've been out sick. I still have some thoughts
> that
> > > I'd like to see addressed before voting.
> > >
> > > 1) If flexibility is the motivation for a pluggable interface, why are
> we
> > > only limiting the uses for this interface to two very specific
> scenarios?
> > > Why not also allow, e.g., authorization errors to be handled as well
> > > (allowing users to drop records destined for some off-limits topics, or
> > > retry for a limited duration in case there's a delay in the propagation
> > of
> >

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-07 Thread Omnia Ibrahim
Hi Justine Thanks for the feedback 

> So consider a case where there is a storm for a given principal. We could
> have a large mass of short lived producers in addition to some
> "well-behaved" ones. My understanding is that if the "well-behaved" one
> doesn't produce as frequently ie less than once per hour, it will also get
> throttled when a storm of short-lived producers leads the principal to hit
> the given rate necessary for throttling. The idea of the filter is that we
> don't throttle existing producers, but in this case, we will.
I believe this would be part of the limitation of the KIP (which is similar to 
some extend with the rest of Kafka Quotas) If KafkaPrincipal of ClientId is 
shared between different use-cases where we have few well behaving and some 
misbehaving we will punish all as these are the identifiers we have about them. 

If I understand the scenario you described correctly I think I can break it to 
the following uses cases
1. KafkaPrincipal is shared between multiple applications with different 
ClientIds then It is not fair to throttle one application because the other one 
that have been configured with same KafkaPrincipal is misbehaving. This can be 
solved by breaking the throttling to be based on the combination of 
KafkaPrincipal-ClientId, it will increase the number of entries in the cache 
but will at least isolate applications. And I still believe it might be tricky 
for whoever is manning the cluster to list and throttle most client ids.
2. A new version of the application has been misconfigured and during rolling 
the upgrades only half the instances of this app has the misconfigured version 
and it keep creating short-lived PIDs while the other half has the well 
behaving but it produce on slower base so it will produce every two hours. This 
one will be impacted and it bit tricky to track these.

In both cases I don’t believe we need to wait for the 24hr expiration of PID to 
hit however, I believe we need to follow one of these solutions 
1. Break these uses cases to use different KafkaPrincipal (and or ClientId if 
we opt-in for this) 
2. If both cases are using the same KafkaPrincipal then they are most likely 
the same owner of both apps and they will need to shutdown or fix the 
misbehaving/misconfigured application that create all these instances and we 
need to wait for the throttle time to pass before the well behaved client 
proceed with the unseen PID. 

> Note -- one thing that wasn't totally clear from the KIP was whether we
> throttle all new produce requests from the client or just the ones with
> unseen IDs. If we throttle them all, perhaps this point isn't a huge deal.
I’ll clarify this. But the KIP is aiming to throttle all unseen PID for X 
amount of time which is the throttle time. 

> The other concern that I brought up is that when we throttle, we will
> likely continue to throttle until the storm stops. This is because we will
> have to wait 1 day or so for IDs to expire, and we will likely replace them
> at a pretty fast rate. This can be acceptable if we believe that it is
> helpful to getting the behavior to stop, but I just wanted to call out that
> the user will likely not be able to start clients in the meantime.
Am not sure the producer need to wait for 1 day (unless the PID quota is set 
too high) as we are throttling unseen PID per user any PID above the quota will 
not be registered at the leader side and we don’t store anything for idempotent 
at initialising so am not sure I see need to wait for 1 day unless am missing 
something. 
If User-A has `producer_ids_rate` 100 and the broker can store 2,000,000 before 
hit out of memory. Then the leader will only store a 100 PIDs in 1 hour and 
throttle any unseen PID as these will be considered new PIDs. If we ht the 
scenario that you described within  1 hr window then this client should be 
alerted that it didn’t produce and got throttled. Then we can apply one of the 
solutions I mentioned in first point. They either split the use cases to 
different KafkaPrincipals or shutdown the misconfigured app and wait the 
throttle time pass. The throttle doesn’t get controlled by the 24hr expiration 
of PID and we don’t even check if PID has expired or not. 

I think I might be missing something regarding the need to wait for 24hr to 
resume! 

Omnia


> On 6 May 2024, at 20:46, Justine Olshan  wrote:
> 
> Hi Claude,
> 
> I can clarify my comments.
> 
> Just to clarify -- my understanding is that we don't intend to throttle any
> new producer IDs at the beginning. I believe this amount is specified by
> `producer_ids_rate`, but you can see this as a number of producer IDs per
> hour.
> 
> So consider a case where there is a storm for a given principal. We could
> have a large mass of short lived producers in addition to some
> "well-behaved" ones. My understanding is that if the "well-behaved" one
> doesn't produce as frequently ie less than once per hour, it will also get
> throttled when a 

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-07 Thread Omnia Ibrahim
Hi Igor, thanks for the feedback and sorry for the late response. 

> 10 Given the goal is to prevent OOMs, do we also need to
> limit the number of KafkaPrincipals in use?

None of the Kafka quotas ever limited number of KafkaPrincipals and I don’t 
really think this is the issue as you just need one misconfigured application 
to hit the OOM issue. 
Also limiting number of KafkaPrincipals will cause problems for on-boarding new 
cases to use the cluster. What is the question behind this question? Is there a 
use case you have in mind that need this?

> 11. How would an operator know or decide to change the configuration
> for the number layers – producer.id.quota.cache.layer.count –
> e.g. increasing from 4 to 5; and why?
> Do we need a new metric to indicate that change could be useful?
> 
> 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
> guaranteed interval, or rather simply a delay between cleanups?
> How did you decide on the default value of 10ms?

I am leaning toward dropping all the caching layer configs and keeping them as 
constant and we can later make them a config if there is a request for this. 
The 10min doesn’t really have a reason behind. It was a random number. 

> 16. LayeredBloomFilter will have a fixed size (right?), but some
> users (KafkaPrincipal) might only use a small number of PIDs.
> It it worth having a dual strategy, where we simply keep a Set of
> PIDs until we reach certain size where it pays off to use
> the LayeredBloomFilter?
The size is driven from the `producer_id_rate` for each KafkaPrincipal. We 
might hit this issue for sure. I believe Claude covered this point. 
> 
>  a. INIT_PRODUCER_ID for idempotent producer request PIDs from
>  random controller every time so if a client got throttled on
>  one controller doesn't guarantee it will not go through on next
>  controller causing OOM at the leader later.
> 
> Is the INIT_PRODUCER_ID request really sent to a "random controller"?
> From a quick look at Sender.maybeSendAndPollTransactionalRequest,
> for an idempotent producer, targetNode is set to the broker with
> fewest outstanding requests. Am I looking at the wrong place?
No you are not looking at the wrong place however this still a random comaring 
with TXN_ID for example or GROUP_ID and not dedicated by any mean as it can 
change at any point depend on which one has less outstanding requests. 

However if we want to focus on where the problem happened this will be the 
leader. We can throttle INIT_PID but this will not limit how many PID can 
produce to one 1 broker 
Let’s image this scenario where we have a cluster where each broker can max 
have 200 PID without hitting OOM. And each KafkaPrincipal can INIT_PID for X 
number (let say 10) and we have like 20 active KafkaPrincipal. Which in total 
we can have 200 INIT_PID request. Now on the leadership side of producing we 
have 
Broker-1 has 100 PID in memory 
Broker-2 has 150 PID in memory 
Broker-3 has 100 PID in memory 
Broker-4 has 120 PID in memory 
Broker-5 has 100 PID in memory 
Broker-6 has 90 PID in memory 
Now Broker-1 is down and for the sake of this example not all 100 PID will move 
to one broker but 60 PID will connect to Broker-2 bringing its total PID to 
210. Now these PIDs aren’t going to be throttle by INIT_PID as they already has 
been initialised which will cause OOM now Broker-2 is down. If Broker-1/2 down 
now the problem is propagated and the cluster will be down as each broker try 
to take leadership will get the initialised PIDs and hit OOM until someone 
manually increase the memory of the brokers or if we hit the PID cleanup at 
some point of all of this mess. 

Hope this explain my concerns with throttling `INIT_PRODUCER_ID`. 

Thanks
Omnia

> On 1 May 2024, at 15:42, Igor Soarez  wrote:
> 
> Hi Omnia, Hi Claude,
> 
> Thanks for putting this KIP together.
> This is an important unresolved issue in Kafka,
> which I have witnessed several times in production.
> 
> Please see my questions below:
> 
> 10 Given the goal is to prevent OOMs, do we also need to
> limit the number of KafkaPrincipals in use?
> 
> 11. How would an operator know or decide to change the configuration
> for the number layers – producer.id.quota.cache.layer.count –
> e.g. increasing from 4 to 5; and why?
> Do we need a new metric to indicate that change could be useful?
> 
> 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
> guaranteed interval, or rather simply a delay between cleanups?
> How did you decide on the default value of 10ms?
> 
> 13. Under "New ProducerIdQuotaManagerCache", the documentation for
> the constructor params for ProducerIDQuotaManagerCache does not
> match the constructor signature.
> 
> 14. Under "New ProducerIdQuotaManagerCache":
>  public boolean track(KafkaPrincipal principal, int producerIdRate, long pid)
> How is producerIdRate used? The reference implementation Claude shared
> does not use it.
> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2880

2024-05-07 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Chris Egerton
Hi Justine,

The method signatures for the interface are indeed open-ended, but the KIP
states that its uses will be limited. See the motivation section:

> We believe that the user should be able to develop custom exception
handlers for managing producer exceptions. On the other hand, this will be
an expert-level API, and using that may result in strange behaviour in the
system, making it hard to find the root cause. Therefore, the custom
handler is currently limited to handling RecordTooLargeException and
UnknownTopicOrPartitionException.

Cheers,

Chris


On Tue, May 7, 2024, 14:37 Justine Olshan 
wrote:

> Hi Alieh,
>
> I was out for KSB and then was also sick. :(
>
> To your point 1) Chris, I don't think it is limited to two specific
> scenarios, since the interface accepts a generic Exception e and can be
> implemented to check if that e is an instanceof any exception. I didn't see
> anywhere that specific errors are enforced. I'm a bit concerned about this
> actually. I'm concerned about the opened-endedness and the contract we have
> with transactions. We are allowing the client to make decisions that are
> somewhat invisible to the server. As an aside, can we build in log messages
> when the handler decides to skip etc a message. I'm really concerned about
> messages being silently dropped.
>
> I do think Chris's point 2) about retriable vs non retriable errors is
> fair. I'm a bit concerned about skipping a unknown topic or partition
> exception too early, as there are cases where it can be transient.
>
> I'm still a little bit wary of allowing dropping records as part of EOS
> generally as in many cases, these errors signify an issue with the original
> data. I understand that streams and connect/mirror maker may have reasons
> they want to progress past these messages, but wondering if there is a way
> that can be done application-side. I'm willing to accept this sort of
> proposal if we can make it clear that this sort of thing is happening and
> we limit the blast radius for what we can do.
>
> Justine
>
> On Tue, May 7, 2024 at 9:55 AM Chris Egerton 
> wrote:
>
> > Hi Alieh,
> >
> > Sorry for the delay, I've been out sick. I still have some thoughts that
> > I'd like to see addressed before voting.
> >
> > 1) If flexibility is the motivation for a pluggable interface, why are we
> > only limiting the uses for this interface to two very specific scenarios?
> > Why not also allow, e.g., authorization errors to be handled as well
> > (allowing users to drop records destined for some off-limits topics, or
> > retry for a limited duration in case there's a delay in the propagation
> of
> > ACL updates)? It'd be nice to see some analysis of other errors that
> could
> > be handled with this new API, both to avoid the follow-up work of another
> > KIP to address them in the future, and to make sure that we're not
> painting
> > ourselves into a corner with the current API in a way that would make
> > future modifications difficult.
> >
> > 2) Something feels a bit off with how retriable vs. non-retriable errors
> > are handled with the interface. Why not introduce two separate methods to
> > handle each case separately? That way there's no ambiguity or implicit
> > behavior when, e.g., attempting to retry on a RecordTooLargeException.
> This
> > could be something like `NonRetriableResponse handle(ProducerRecord,
> > Exception)` and `RetriableResponse handleRetriable(ProducerRecord,
> > Exception)`, though the exact names and shape can obviously be toyed
> with a
> > bit.
> >
> > 3) Although the flexibility of a pluggable interface may benefit some
> > users' custom producer applications and Kafka Streams applications, it
> > comes at significant deployment cost for other low-/no-code environments,
> > including but not limited to Kafka Connect and MirrorMaker 2. Can we add
> a
> > default implementation of the exception handler that allows for some
> simple
> > behavior to be tweaked via configuration property? Two things that would
> be
> > nice to have would be A) an upper bound on the retry time for
> > unknown-topic-partition exceptions and B) an option to drop records that
> > are large enough to trigger a record-too-large exception.
> >
> > 4) I'd still prefer to see "SKIP" or "DROP" instead of the proposed
> > "SWALLOW" option, which IMO is opaque and non-obvious, especially when
> > trying to guess the behavior for retriable errors.
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
>  > >
> > wrote:
> >
> > > Hi all,
> > >
> > >
> > > A summary of the KIP and the discussions:
> > >
> > >
> > > The KIP introduces a handler interface for Producer in order to handle
> > two
> > > exceptions: RecordTooLargeException and
> UnknownTopicOrPartitionException.
> > > The handler handles the exceptions per-record.
> > >
> > >
> > > - Do we need this handler?  [Motivation and Examples sections]
> > >
> > >
> > > RecordTooLargeException: 1) In transactions, the producer

Request version not enabled errors upgrading from Kafka 3.5 -> 3.6

2024-05-07 Thread Johnson Okorie
Hi folks,

Awesome work you have been doing on this project!

I was hoping I could get some help on an issue we are having in one of our
Kafka clusters. Most of the clients on this cluster use
exactly-once-semantics. The Kafka cluster currently runs version 3.5.2 and
we were attempting an upgrade to 3.6.2. After replacing one of the brokers
with the new version we saw a bunch of the following errors on the older
brokers:

```
Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not
enabled
```

This manifested as 'NETWORK_EXCEPTION' errors on the clients and downtime
for those clients. On the new broker we saw:

```
[AddPartitionsToTxnSenderThread-1063]: AddPartitionsToTxnRequest failed for
node 1069 with a network exception.
```

Digging through the changes in 3.6, we came across some changes introduced
as part of KAFKA-14402  that
we thought might lead to this behaviour and wanted to confirm.

First we could see that  transaction.partition.verification.enable
is enabled by default and enables a new code path that culminates in we
sending version 4 ADD_PARTITIONS_TO_TXN requests to other brokers here

.

However, we do not support  version 4 of ADD_PARTITIONS_TO_TXN requests as
of Kafka 3.5.2? If these assumptions happen to be correct, does this mean
that the upgrade to versions 3.6+ require
transaction.partition.verification.enable
to be set to false to allow upgrades?

Regard,
Johnson


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Justine Olshan
Hi Alieh,

I was out for KSB and then was also sick. :(

To your point 1) Chris, I don't think it is limited to two specific
scenarios, since the interface accepts a generic Exception e and can be
implemented to check if that e is an instanceof any exception. I didn't see
anywhere that specific errors are enforced. I'm a bit concerned about this
actually. I'm concerned about the opened-endedness and the contract we have
with transactions. We are allowing the client to make decisions that are
somewhat invisible to the server. As an aside, can we build in log messages
when the handler decides to skip etc a message. I'm really concerned about
messages being silently dropped.

I do think Chris's point 2) about retriable vs non retriable errors is
fair. I'm a bit concerned about skipping a unknown topic or partition
exception too early, as there are cases where it can be transient.

I'm still a little bit wary of allowing dropping records as part of EOS
generally as in many cases, these errors signify an issue with the original
data. I understand that streams and connect/mirror maker may have reasons
they want to progress past these messages, but wondering if there is a way
that can be done application-side. I'm willing to accept this sort of
proposal if we can make it clear that this sort of thing is happening and
we limit the blast radius for what we can do.

Justine

On Tue, May 7, 2024 at 9:55 AM Chris Egerton 
wrote:

> Hi Alieh,
>
> Sorry for the delay, I've been out sick. I still have some thoughts that
> I'd like to see addressed before voting.
>
> 1) If flexibility is the motivation for a pluggable interface, why are we
> only limiting the uses for this interface to two very specific scenarios?
> Why not also allow, e.g., authorization errors to be handled as well
> (allowing users to drop records destined for some off-limits topics, or
> retry for a limited duration in case there's a delay in the propagation of
> ACL updates)? It'd be nice to see some analysis of other errors that could
> be handled with this new API, both to avoid the follow-up work of another
> KIP to address them in the future, and to make sure that we're not painting
> ourselves into a corner with the current API in a way that would make
> future modifications difficult.
>
> 2) Something feels a bit off with how retriable vs. non-retriable errors
> are handled with the interface. Why not introduce two separate methods to
> handle each case separately? That way there's no ambiguity or implicit
> behavior when, e.g., attempting to retry on a RecordTooLargeException. This
> could be something like `NonRetriableResponse handle(ProducerRecord,
> Exception)` and `RetriableResponse handleRetriable(ProducerRecord,
> Exception)`, though the exact names and shape can obviously be toyed with a
> bit.
>
> 3) Although the flexibility of a pluggable interface may benefit some
> users' custom producer applications and Kafka Streams applications, it
> comes at significant deployment cost for other low-/no-code environments,
> including but not limited to Kafka Connect and MirrorMaker 2. Can we add a
> default implementation of the exception handler that allows for some simple
> behavior to be tweaked via configuration property? Two things that would be
> nice to have would be A) an upper bound on the retry time for
> unknown-topic-partition exceptions and B) an option to drop records that
> are large enough to trigger a record-too-large exception.
>
> 4) I'd still prefer to see "SKIP" or "DROP" instead of the proposed
> "SWALLOW" option, which IMO is opaque and non-obvious, especially when
> trying to guess the behavior for retriable errors.
>
> Cheers,
>
> Chris
>
> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi  >
> wrote:
>
> > Hi all,
> >
> >
> > A summary of the KIP and the discussions:
> >
> >
> > The KIP introduces a handler interface for Producer in order to handle
> two
> > exceptions: RecordTooLargeException and UnknownTopicOrPartitionException.
> > The handler handles the exceptions per-record.
> >
> >
> > - Do we need this handler?  [Motivation and Examples sections]
> >
> >
> > RecordTooLargeException: 1) In transactions, the producer collects
> multiple
> > records in batches. Then a RecordTooLargeException related to a single
> > record leads to failing the entire batch. A custom exception handler in
> > this case may decide on dropping the record and continuing the
> processing.
> > See Example 1, please. 2) More over, in Kafka Streams, a record that is
> too
> > large is a poison pill record, and there is no way to skip over it. A
> > handler would allow us to react to this error inside the producer, i.e.,
> > local to where the error happens, and thus simplify the overall code
> > significantly. Please read the Motivation section for more explanation.
> >
> >
> > UnknownTopicOrPartitionException: For this case, the producer handles
> this
> > exception internally and only issues a WARN log about missing metadata
> and
> > retries 

[jira] [Resolved] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15018.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.8.0
>
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Chris Egerton
Hi Alieh,

Sorry for the delay, I've been out sick. I still have some thoughts that
I'd like to see addressed before voting.

1) If flexibility is the motivation for a pluggable interface, why are we
only limiting the uses for this interface to two very specific scenarios?
Why not also allow, e.g., authorization errors to be handled as well
(allowing users to drop records destined for some off-limits topics, or
retry for a limited duration in case there's a delay in the propagation of
ACL updates)? It'd be nice to see some analysis of other errors that could
be handled with this new API, both to avoid the follow-up work of another
KIP to address them in the future, and to make sure that we're not painting
ourselves into a corner with the current API in a way that would make
future modifications difficult.

2) Something feels a bit off with how retriable vs. non-retriable errors
are handled with the interface. Why not introduce two separate methods to
handle each case separately? That way there's no ambiguity or implicit
behavior when, e.g., attempting to retry on a RecordTooLargeException. This
could be something like `NonRetriableResponse handle(ProducerRecord,
Exception)` and `RetriableResponse handleRetriable(ProducerRecord,
Exception)`, though the exact names and shape can obviously be toyed with a
bit.

3) Although the flexibility of a pluggable interface may benefit some
users' custom producer applications and Kafka Streams applications, it
comes at significant deployment cost for other low-/no-code environments,
including but not limited to Kafka Connect and MirrorMaker 2. Can we add a
default implementation of the exception handler that allows for some simple
behavior to be tweaked via configuration property? Two things that would be
nice to have would be A) an upper bound on the retry time for
unknown-topic-partition exceptions and B) an option to drop records that
are large enough to trigger a record-too-large exception.

4) I'd still prefer to see "SKIP" or "DROP" instead of the proposed
"SWALLOW" option, which IMO is opaque and non-obvious, especially when
trying to guess the behavior for retriable errors.

Cheers,

Chris

On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi 
wrote:

> Hi all,
>
>
> A summary of the KIP and the discussions:
>
>
> The KIP introduces a handler interface for Producer in order to handle two
> exceptions: RecordTooLargeException and UnknownTopicOrPartitionException.
> The handler handles the exceptions per-record.
>
>
> - Do we need this handler?  [Motivation and Examples sections]
>
>
> RecordTooLargeException: 1) In transactions, the producer collects multiple
> records in batches. Then a RecordTooLargeException related to a single
> record leads to failing the entire batch. A custom exception handler in
> this case may decide on dropping the record and continuing the processing.
> See Example 1, please. 2) More over, in Kafka Streams, a record that is too
> large is a poison pill record, and there is no way to skip over it. A
> handler would allow us to react to this error inside the producer, i.e.,
> local to where the error happens, and thus simplify the overall code
> significantly. Please read the Motivation section for more explanation.
>
>
> UnknownTopicOrPartitionException: For this case, the producer handles this
> exception internally and only issues a WARN log about missing metadata and
> retries internally. Later, when the producer hits "deliver.timeout.ms"  it
> throws a TimeoutException, and the user can only blindly retry, which may
> result in an infinite retry loop. The thrown TimeoutException "cuts" the
> connection to the underlying root cause of missing metadata (which could
> indeed be a transient error but is persistent for a non-existing topic).
> Thus, there is no programmatic way to break the infinite retry loop. Kafka
> Streams also blindly retries for this case, and the application gets stuck.
>
>
>
> - Having interface vs configuration option: [Motivation, Examples, and
> Rejected Alternatives sections]
>
> Our solution is introducing an interface due to the full flexibility that
> it offers. Sometimes users, especially Kafka Streams ones, determine the
> handler's behaviour based on the situation. For example, f
> acing UnknownTopicOrPartitionException*, *the user may want to raise an
> error for some topics but retry it for other topics. Having a configuration
> option with a fixed set of possibilities does not serve the user's
> needs. See Example 2, please.
>
>
> - Note on RecordTooLargeException: [Public Interfaces section]
>
> If the custom handler decides on SWALLOW for RecordTooLargeException, then
> this record will not be a part of the batch of transactions and will also
> not be sent to the broker in non-transactional mode. So no worries about
> getting a RecordTooLargeException from the broker in this case, as the
> record will never ever be sent to the broker. SWALLOW means drop the record
> and continue/swallow the error.
>
>
> - 

[jira] [Created] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)
FTR created KAFKA-16687:
---

 Summary: Native memory leak by Unsafe_allocatememory  in Kafka 
Clients  3.7.0
 Key: KAFKA-16687
 URL: https://issues.apache.org/jira/browse/KAFKA-16687
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: FTR


I am building a Java Project which using Maven dependency Kafka-clients with 
3.7.0 version.
My Java application logic is to use Kafka Consumer to poll Kakfa broker topic  
continuously. 
I have configured my Java application with JVM options with -Xms8G -Xmx8G  
-XX:MaxMetaspaceSize=4G, and then run it. 
Also, there are 16G physical memory on my virtual machine. 
After my Java application running a long time, I have found that resident 
memory of the Java Process was being grown to more than 14G.
In the end, the Java process ate Swap space. 
I checked it with jmap -heap pid, and found heap memory usage is Ok. 
Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I found 
that it's caused by [NMT Internal] memory,  which created by 
Unsafe_allocatememory xxx.
In my Java application, I don't use any NIO DirectByteBuffer to allocate memory.
And I check it the Kafka-clients source code, it have codes with use 
"sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it .  
Could you help to check it? How could I to stop this growing native memory to 
avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16686) Flakey tests in TopicBasedRemoteLogMetadataManagerTest

2024-05-07 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16686:
-

 Summary: Flakey tests in TopicBasedRemoteLogMetadataManagerTest
 Key: KAFKA-16686
 URL: https://issues.apache.org/jira/browse/KAFKA-16686
 Project: Kafka
  Issue Type: Test
  Components: Tiered-Storage
Reporter: Gaurav Narula
Assignee: Gaurav Narula


Tests in {{TopicBasedRemoteLogMetadataManagerTest}} flake because 
{{waitUntilConsumerCatchesUp}} may return before all expected metadata is 
caught up.

Flakyness report 
[here|https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Alieh Saeedi
Hi all,

It seems that we have no more comments, discussions, or feedback on
KIP-1038; therefore, I’d like to open voting for the KIP: Add Custom Error
Handler to Producer



Cheers,
Alieh


[DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-07 Thread Loic Greffier
Hi Matthias, 

To sum up with the ProductionExceptionHandler callback methods (106) proposed 
changes.

A new method ProductionExceptionHandler#handle is added with the following 
signature:

> ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
> final ProducerRecord record, final Exception exception);

The ProducerRecord parameter has changed to accept a serialized or 
non-serialized record. 
Thus, the new ProductionExceptionHandler#handle method can handle both 
production exception and serialization exception.

Both old ProductionExceptionHandler#handle and 
ProductionExceptionHandler#handleSerializationException methods are now 
deprecated.
The old ProductionExceptionHandler#handle method gets a default implementation, 
so users do not have to implement a deprecated method.

To handle backward compatibility, the new ProductionExceptionHandler#handle 
method gets a default implementation.

> default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext 
> context, final ProducerRecord record, final Exception exception) {
>   if (exception instanceof RecordSerializationException) {
>   this.handleSerializationException(record, exception.getCause());
>   }
>
>   return handle((ProducerRecord) record, exception);
> }

The default implementation either invokes #handleSerializationException or 
#handle depending on the type of the exception, thus users still relying on 
deprecated ProductionExceptionHandler#handle 
or ProductionExceptionHandler#handleSerializationException custom 
implementations won't break.

The new ProductionExceptionHandler#handle method is now invoked in case of 
serialization exception:

> public  void send(final String topic, final K key, final V value, ...) {
> try {
> keyBytes = keySerializer.serialize(topic, headers, key);
> ...   
> } catch (final ClassCastException exception) {
>   ...
> } catch (final Exception exception) {
> 
> try {
> response = productionExceptionHandler.handle(context, record, new 
> RecordSerializationException(SerializationExceptionOrigin.KEY, exception));
> } catch (final Exception e) {
> ...
> }
> }
> }

To wrap the origin serialization exception and determine whether it comes from 
the key or the value, a new exception class is created:

> public class RecordSerializationException extends SerializationException {
> public enum SerializationExceptionOrigin {
> KEY,
> VALUE
> }
>
> public RecordSerializationException(SerializationExceptionOrigin origin, 
> final Throwable cause);
> }

Hope it all makes sense,
Loïc


[jira] [Resolved] (KAFKA-13329) Connect does not perform preflight validation for per-connector key and value converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13329.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector key and value 
> converters
> 
>
> Key: KAFKA-13329
> URL: https://issues.apache.org/jira/browse/KAFKA-13329
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a key and/or value converter class for their connector 
> directly in the configuration for that connector. If this occurs, no 
> preflight validation is performed to ensure that the specified converter is 
> valid.
> Unfortunately, the [Converter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java]
>  does not require converters to expose a {{ConfigDef}} (unlike the 
> [HeaderConverter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  which does have that requirement), so it's unlikely that the configuration 
> properties of the converter itself can be validated.
> However, we can and should still validate that the converter class exists, 
> can be instantiated (i.e., has a public, no-args constructor and is a 
> concrete, non-abstract class), and implements the {{Converter}} interface.
> *EDIT:* Since this ticket was originally filed, a {{Converter::config}} 
> method was added in 
> [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions].
>  We can now utilize that config definition during preflight validation for 
> connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13328) Connect does not perform preflight validation for per-connector header converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13328.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector header 
> converters
> -
>
> Key: KAFKA-13328
> URL: https://issues.apache.org/jira/browse/KAFKA-13328
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a header converter class for their connector directly in 
> the configuration for that connector. If this occurs, no preflight validation 
> is performed to ensure that the specified converter is valid.
> {{HeaderConverter}} implementations are required to provide a valid 
> {{ConfigDef}} to the Connect framework via 
> [HeaderConverter::config|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  but this object isn't actually leveraged anywhere by Connect.
> Connect should make use of this config object during preflight validation for 
> connectors to fail faster when their header converters are misconfigured.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Creating kafka wiki id

2024-05-07 Thread 黃竣陽
Hello, I want to create a KIP, but I don't have Kafka wiki id. I go to the
page (https://cwiki.apache.org/confluence/signup.action) but it doesn't
have a button to regist an account
Please help me to create an account, Thank you


Re: [DISCUSS] KIP-1042 support for wildcard when creating new acls

2024-05-07 Thread Claude Warren, Jr
I have updated KIP-1042 with a proposal for how to reduce the time spent
looking for matching wildcard patterns.  Experimentally I see a reduction
of 66-75% execution time.

On Mon, May 6, 2024 at 9:03 PM Greg Harris 
wrote:

> Hi Murali,
>
> Thanks for the KIP!
>
> I think I understand the motivation for this KIP in situations where
> there are a "cross product" of topics for two or more variables X and
> Y, and want to write ACLs for each of the variable axes.
> If you format your topics "X-Y-suffix", it's not easy to write rules
> that apply to all "Y" topics, because you need to enumerate all of the
> "X" values, and the problem persists even if you reorder the topic
> name.
>
> In my recent work on KIP-986 I found it necessary to introduce
> "namespaces" to group topics together, and I was going to replicate
> the ACL system to specify those namespaces. This change to the ACL
> system could increase the expressiveness and complexity of that
> feature, if it is ever implemented.
> One of the primitives I needed when specifying namespaces was the
> ability to tell when two namespaces overlapped (i.e. does there exist
> any topic which is present in both namespaces). This is trivial to do
> with the current PREFIX and LITERAL system, as we can find the
> maximum-length common prefix with just some length comparisons and
> equality checks.
> I considered specifying namespaces via regular expressions, and found
> that it was computationally much more difficult. Computing the
> intersection of two regexes appears to be exponential in the length of
> the regexes, leading me to avoid adding it.
>
> I understand that you're not suggesting full REGEX support, and that
> "namespaces" don't need to support MATCH, but I think MATCH may run
> into related difficulties. Any MATCH can overlap with any other MATCH
> or PREFIX if it includes a sufficient number of wildcards. For
> example:
> MATCH *-accounts-* has overlap with PREFIX nl as they can both match
> "nl-accounts-localtopic", but that isn't sensitive to the contents
> "nl", it is true for any PREFIX.
> MATCH *-accounts-* has overlap with MATCH *localtopic, as they can
> both match "nl-accounts-localtopic", but that isn't actually sensitive
> to the contents "localtopic", it's true for any MATCH which includes a
> wildcard at the beginning.
>
> This has implications for execution complexity: If we can't compute
> whether two patterns overlap, then we need to run both of them on each
> piece of input to test if they both match. Under the current
> LITERAL/PREFIX system, we can optimize execution with a trie, but that
> option wouldn't be available to us with MATCH.
>
> The current system makes users evaluate a trade-off:
> 1. Optimize the number of ACLs by organizing topics according to
> prefixes (for example, "accounts-localtopic-nl" and PREFIX "accounts",
> PREFIX "accounts-localtopic")
> 2. Use less-structured topic names, with a corresponding ACL scheme
> that has more individual rules.
> The system currently informs users of this tradeoff by making them
> write multiple ACLs, and making them think "there has got to be a
> better way!". Perhaps we can find a better way to surface this best
> practice, or better inform users about it.
>
> I understand that there are going to be situations more complex than
> your example, where multiple individual rules will always be necessary
> with only PREFIX evaluation. I think even in those situations, a
> number of efficient-to-evaluate rules is preferable to just one
> expensive-to-evaluate rule.
>
> One alternative that I thought of could be "PARAMETERIZED" ACLs which
> are like PREFIXED, but allow some parameter substitution. For example
> PARAMETERIZED "(nl|de|cz)-accounts-". I'm lifting regex syntax here,
> but this isn't actually a regex, and wouldn't allow arbitrary numbers
> of characters, or the * or + operators.
> In the background it could evaluate exactly like the 3 individual
> PREFIX rules, but be easier to evaluate on the backend, and support
> the intersection query I mentioned earlier. It could also support
> [a-zA-Z] notation in case the parameter values aren't known ahead of
> time, but have a fixed length.
>
> Thanks,
> Greg
>
> On Mon, May 6, 2024 at 11:17 AM Claude Warren  wrote:
> >
> > I have an idea for how to reduce the time for ACL lookups in general and
> > particularly where wildcards are involved using sequence
> > characterization techniques from bioinformatics.  But I need a set of ACL
> > patterns and associated topics to test with.
> >
> > On Fri, May 3, 2024 at 2:45 PM Haruki Okada  wrote:
> >
> > > Hi, Murali.
> > >
> > > First, could you add the KIP-1042 to the index (
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > > )
> > > as well so that everyone can find it easily?
> > >
> > > I took a look at the KIP, then I have 2 questions:
> > >
> > > 1. Though the new MATCH resource pattern type may reduce the effort of
> > > adding ACLs i

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2879

2024-05-07 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-16307) fix EventAccumulator thread idle ratio metric

2024-05-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16307.
-
Fix Version/s: 3.8.0
 Reviewer: David Jacot
   Resolution: Fixed

> fix EventAccumulator thread idle ratio metric
> -
>
> Key: KAFKA-16307
> URL: https://issues.apache.org/jira/browse/KAFKA-16307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.8.0
>
>
> The metric does not seem to be accurate, nor reporting metrics at every 
> interval. Requires investigation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.7 #150

2024-05-07 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16685) RSM Task warn logs do not include parent exception trace

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-16685:


 Summary: RSM Task warn logs do not include parent exception trace
 Key: KAFKA-16685
 URL: https://issues.apache.org/jira/browse/KAFKA-16685
 Project: Kafka
  Issue Type: Improvement
  Components: Tiered-Storage
Reporter: Jorge Esteban Quilcate Otoya
Assignee: Jorge Esteban Quilcate Otoya


When RSMTask exceptions happen and are logged, it only includes the exception 
message, but we lose the stack trace.

See 
[https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831]

This makes it difficult to troubleshoot issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16684) FetchResponse#responseData could return incorrect data

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16684:
--

 Summary: FetchResponse#responseData could return incorrect data
 Key: KAFKA-16684
 URL: https://issues.apache.org/jira/browse/KAFKA-16684
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


[https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5]
 make it accept input to return "partial" data. The content of output is based 
on the input but we cache the output ... It will return same output even though 
we pass different input. That is a potential bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-05-07 Thread Kamal Chandraprakash
Hi Christo,

Thanks for the update!

For both the policies "retain" and "delete", can we maintain the same
approach to delete the segments async?

> If the disablement policy is set to delete, the Log start offset (LSO) is
updated to match the Local Log Start Offset and the remote log is deleted
by calling the RemoteStorageManager#deleteLogSegmentData().

In the KIP, it's mentioned that when the disable policy is set to "delete",
the remote-log-segments will be
deleted in-sync. The stopPartition call might get timed out when the number
of remote log segments to
delete is huge. We can further extend the same approach for the topic
deletion requests.

Also, Could you please update the state diagram about the transitions? It
is not clear when to transit from
DISABLING to DISABLED state?

--
Kamal

On Mon, May 6, 2024 at 6:55 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Ignore the above message. Got the answers after reading the state
> transition section.
>
> > If the disablement policy is delete, tasks scheduled for the
> topic-partitions in the RemoteDataExpirationThreadPool will also be
> canceled.
>
> We are deleting the segments synchronously. Should we delete them
> asynchronously? The same approach can be extended to topic deletion
> requests.
>
> > 6. In ZK mode, what will the controller do if the "stopReplicas"
> responses not received from all brokers? Reverting the changes?
>
> Since we are deleting the segments synchronously. This case can be bound
> to happen when the number of remote log segments to
> delete is huge.
>
>
> On Mon, May 6, 2024, 18:12 Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
>> Hi Christo,
>>
>> Thanks for the update!
>>
>> 1. In the ZK mode, how will the transition from DISABLING to DISABLED
>> state happen?
>> For the "retain" policy, until we delete all the remote-log segments, the
>> state will be
>> DISABLING and the deletion can happen only when they breach either the
>> retention
>> time (or) size.
>>
>> How does the controller monitor that all the remote log segments are
>> deleted for all
>> the partitions of the topic before transitioning the state to DISABLED?
>>
>> 2. In Kraft, we have only ENABLED -> DISABLED state. How are we
>> supporting the case
>> "retain" -> "enable"?
>>
>> If the remote storage is degraded, we want to avoid uploading the
>> segments temporarily
>> and resume back once the remote storage is healthy. Is the case supported?
>>
>>
>>
>> On Fri, May 3, 2024 at 12:12 PM Luke Chen  wrote:
>>
>>> Also, I think using `stopReplicas` request is a good idea because it
>>> won't cause any problems while migrating to KRaft mode.
>>> The stopReplicas request is one of the request that KRaft controller
>>> will send to ZK brokers during migration.
>>>
>>> Thanks.
>>> Luke
>>>
>>> On Fri, May 3, 2024 at 11:48 AM Luke Chen  wrote:
>>>
 Hi Christo,

 Thanks for the update.

 Questions:
 1. For this
 "The possible state transition from DISABLED state is to the ENABLED."
 I think it only applies for KRaft mode. In ZK mode, the possible state
 is "DISABLING", right?

 2. For this:
 "If the cluster is using Zookeeper as the control plane, enabling
 remote storage for a topic triggers the controller to send this information
 to Zookeeper. Each broker listens for changes in Zookeeper, and when a
 change is detected, the broker triggers
 RemoteLogManager#onLeadershipChange()."

 I think the way ZK brokers knows the leadership change is by getting
 the LeaderAndISRRequeset from the controller, not listening for changes in
 ZK.

 3. In the KRaft handler steps, you said:
 "The controller also updates the Topic metadata to increment the
 tiered_epoch and update the tiered_stateto DISABLING state."

 Should it be "DISABLED" state since it's KRaft mode?

 4. I was thinking how we handle the tiered_epoch not match error.
 For ZK, I think the controller won't write any data into ZK Znode,
 For KRaft, either configRecord or updateTopicMetadata records won't be
 written.
 Is that right? Because the current workflow makes me think there will
 be partial data updated in ZK/KRaft when tiered_epoch error.

 5. Since we changed to use stopReplicas (V5) request now, the diagram
 for ZK workflow might also need to update.

 6. In ZK mode, what will the controller do if the "stopReplicas"
 responses not received from all brokers? Reverting the changes?
 This won't happen in KRaft mode because it's broker's responsibility to
 fetch metadata update from controller.


 Thank you.
 Luke


 On Fri, Apr 19, 2024 at 10:23 PM Christo Lolov 
 wrote:

> Heya all!
>
> I have updated KIP-950. A list of what I have updated is:
>
> * Explicitly state that Zookeeper-backed clusters will have ENABLED ->
> DISABLING -> DISABLED while KRa

[jira] [Created] (KAFKA-16683) Extract security-related helpers from scala.TestUtils to java class

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16683:
--

 Summary: Extract security-related helpers from scala.TestUtils to 
java class
 Key: KAFKA-16683
 URL: https://issues.apache.org/jira/browse/KAFKA-16683
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We can merge them into `JaasTestUtils and then rename `JaasTestUtils` to 
`SecurityTestUtils.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16682) Rewrite JassTestUtils by Java

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16682:
--

 Summary: Rewrite JassTestUtils by Java
 Key: KAFKA-16682
 URL: https://issues.apache.org/jira/browse/KAFKA-16682
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


as title

one more thing is that we should change the package name from kafka.utils to 
kafka.security



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16681) Rewrite MiniKDC by Java

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16681:
--

 Summary: Rewrite MiniKDC by Java
 Key: KAFKA-16681
 URL: https://issues.apache.org/jira/browse/KAFKA-16681
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Noted:
 # we need to move it from scala folder to java folder
 # don't change the package name since system tests requires it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16680) Make ClusterTestExtensions support SASL

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16680:
--

 Summary: Make ClusterTestExtensions support SASL
 Key: KAFKA-16680
 URL: https://issues.apache.org/jira/browse/KAFKA-16680
 Project: Kafka
  Issue Type: New Feature
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This is a umbrella issue.

In order to migrate more tests to new test infra, we ought to make it support 
SASL at least.

*phase1: reuse/rewrite existent SASL utils by Java*
 # MiniKdc
 # JaasTestUtils
 # Move security-related helpers from scala.TestUtils to java.TestUtils
 # extract/rewrite non-zk code from SaslSetup to new java class

*phase2: make `ClusterTest#securityProtocol` works. It does not work for kraft 
mode :(*
 # add client-related helper to generate consumer/producer/admin class with 
security configs
 # configure kraft server with security settings
 # migrate tests of tools to use new test infra with security

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-07 Thread Lucas Brutschy
Hi,

thanks for the KIP!

+1 (binding)

Best,
Lucas

On Tue, May 7, 2024 at 9:37 AM Bruno Cadonna  wrote:
>
> Thanks for the KIP!
>
> Looking forward to a well-structured task assignor!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 5/3/24 2:44 AM, Matthias J. Sax wrote:
> > I left one more nit on the discuss thread. But overall LGTM.
> >
> > +1 (binding)
> >
> > Thanks Rohan and Sophie for driving this KIP.
> >
> >
> > -Matthias
> >
> > On 4/29/24 2:07 PM, Sophie Blee-Goldman wrote:
> >> +1 (binding)
> >>
> >> thanks for driving this KIP!
> >>
> >> On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai 
> >> wrote:
> >>
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
> >>>
> >>> As this KIP has been open for a while, and gone through a couple
> >>> rounds of
> >>> review/revision, I'm calling a vote to get it approved.
> >>>
> >>


Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-07 Thread Bruno Cadonna

Thanks for the KIP!

Looking forward to a well-structured task assignor!

+1 (binding)

Best,
Bruno

On 5/3/24 2:44 AM, Matthias J. Sax wrote:

I left one more nit on the discuss thread. But overall LGTM.

+1 (binding)

Thanks Rohan and Sophie for driving this KIP.


-Matthias

On 4/29/24 2:07 PM, Sophie Blee-Goldman wrote:

+1 (binding)

thanks for driving this KIP!

On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai  
wrote:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

As this KIP has been open for a while, and gone through a couple 
rounds of

review/revision, I'm calling a vote to get it approved.





[jira] [Created] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16679:
--

 Summary: Merge `DeleteRecordsCommandUnitTest` into 
`DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into 
 Key: KAFKA-16679
 URL: https://issues.apache.org/jira/browse/KAFKA-16679
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Normally, we don't put multi test classes into single file. Those test classes 
can be extracted into a new class file. Or we can merge them into single class 
by using "@Test" annotation. That can make those test cases run without 
embedded cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add code signing key for Igor [kafka-site]

2024-05-07 Thread via GitHub


soarez merged PR #600:
URL: https://github.com/apache/kafka-site/pull/600


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16615) JoinGroup API for upgrading ConsumerGroup

2024-05-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16615.
-
Fix Version/s: 3.8.0
 Reviewer: David Jacot
 Assignee: Dongnuo Lyu
   Resolution: Fixed

> JoinGroup API for upgrading ConsumerGroup
> -
>
> Key: KAFKA-16615
> URL: https://issues.apache.org/jira/browse/KAFKA-16615
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)