[jira] [Resolved] (KAFKA-12947) Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ...

2024-05-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12947.

Resolution: Duplicate

this is fixed by https://github.com/apache/kafka/pull/14623

> Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ...
> --
>
> Key: KAFKA-12947
> URL: https://issues.apache.org/jira/browse/KAFKA-12947
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Dalibor Plavcic
>Priority: Major
>
> For Kafka-7438



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16484) Support to define per broker/controller property by ClusterConfigProperty

2024-05-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16484.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Support to define per broker/controller property by ClusterConfigProperty
> -
>
> Key: KAFKA-16484
> URL: https://issues.apache.org/jira/browse/KAFKA-16484
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0
>
>
> the property set to `ClusterConfigProperty` gets applied to all brokers, and 
> hence we can't have individual props for each broker to test racks.
>  
> It seems to me we can add new field "id" to `ClusterConfigProperty` to 
> declare the property should be applied to specific broker (or controller). 
> the default value is -1 and it should be applied to all nodes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-09 Thread Artem Livshits
Hi Mathias,

> [AL1] While I see the point, I would think having a different callback
for every exception might not really be elegant?

I'm not sure how to assess the level of elegance of the proposal, but I can
comment on the technical characteristics:

1. Having specific interfaces that codify the logic that is currently
prescribed in the comments reduce the chance of making a mistake.
Commments may get ignored, misuderstood or etc. but if the contract is
codified, the compilier will help to enforce the contract.
2. Given that the logic is trickier than it seems (the record-too-large is
an example that can easily confuse someone who's not intimately familiar
with the nuances of the batching logic), having a little more hoops to jump
would give a greater chance that whoever tries to add a new cases pauses
and thinks a bit more.
3. As Justine pointed out, having different method will be a forcing
function to go through a KIP rather than smuggle new cases through
implementation.
4. Sort of a consequence of the previous 3 -- all those things reduce the
chance of someone writing the code that works with 2 errors and then when
more errors are added in the future will suddenly incorrectly ignore new
errors (the example I gave in the previous email).

> [AL2 cont.] Similar to AL1, I see such a handler to some extend as
business logic. If a user puts a bad filter condition in their KS app, and
drops messages

I agree that there is always a chance to get a bug and lose messages, but
there are generally separation of concerns that has different risk profile:
the filtering logic may be more rigorously tested and rarely changed (say
an application developer does it), but setting the topics to produce may be
done via configuration (e.g. a user of the application does it) and it's
generally an expectation that users would get an error when configuration
is incorrect.

What could be worse is that UnknownTopicOrPartitionException can be an
intermittent error, i.e. with a generally correct configuration, there
could be metadata propagation problem on the cluster and then a random set
of records could get lost.

> [AL3] Maybe I misunderstand what you are saying, but to me, checking the
size of the record upfront is exactly what the KIP proposes? No?

It achieves the same result but solves it differently, my proposal:

1. Application checks the validity of a record (maybe via a new
validateRecord method) before producing it, and can just exclude it or
return an error to the user.
2. Application produces the record -- at this point there are no records
that could return record too large, they were either skipped at step 1 or
we didn't get here because step 1 failed.

Vs. KIP's proposal

1. Application produces the record.
2. Application gets a callback.
3. Application returns the action on how to proceed.

The advantage of the former is the clarity of semantics -- the record is
invalid (property of the record, not a function of server state or server
configuration) and we can clearly know that it is the record that is bad
and can never succeed.

The KIP-proposed way actually has a very tricky point: it actually handles
a subset of record-too-large exceptions.  The broker can return
record-too-large and reject the whole batch (but we don't want to ignore
those because then we can skip random records that just happened to be in
the same batch), in some sense we use the same error for 2 different
conditions and understanding that requires pretty deep understanding of
Kafka internals.

-Artem


On Wed, May 8, 2024 at 9:47 AM Justine Olshan 
wrote:

> My concern with respect to it being fragile: the code that ensures the
> error type is internal to the producer. Someone may see it and say, I want
> to add such and such error. This looks like internal code, so I don't need
> a KIP, and then they can change it to whatever they want thinking it is
> within the typical kafka improvement protocol.
>
> Relying on an internal change to enforce an external API is fragile in my
> opinion. That's why I sort of agreed with Artem with enforcing the error in
> the method signature -- part of the public API.
>
> Chris's comments on requiring more information to handler again makes me
> wonder if we are solving a problem of lack of information at the
> application level with a more powerful solution than we need. (Ie, if we
> had more information, could the application close and restart the
> transaction rather than having to drop records) But I am happy to
> compromise with a handler that we can agree is sufficiently controlled and
> documented.
>
> Justine
>
> On Wed, May 8, 2024 at 7:20 AM Chris Egerton 
> wrote:
>
> > Hi Alieh,
> >
> > Continuing prior discussions:
> >
> > 1) Regarding the "flexibility" discussion, my overarching point is that I
> > don't see the point in allowing for this kind of pluggable logic without
> > also covering more scenarios. Take example 2 in the KIP: if we're going
> to
> > implement retries only on 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-09 Thread Sophie Blee-Goldman
The type safety issue is definitely not solved by having two separate
callbacks. I just think it gets a bit worse by mashing them into one
method. At least in the plain #handle method you can be sure that the type
is ProducerRecord and in #handleSerialization the type is
some POJO.

And in theory you can just embed the mapping of sink topics to type/Serde
based on your topology. Or let's say your output record keys & values are
all Strings, and you want to print the String representation in your
handler, rather than the bytes.
Having a separate callback means knowing you can simply print the
ProducerRecord's key/value in the #handleSerialization method, and will
have to use a StringDeserializer to convert the key/value to its String
form to print it in the #handle method.

Again, I just feel this will be more straightforward and easy for users to
use correctly, but am satisfied either way. I'll shut up now and wait for
the KIP authors to make a call on this one way or another, and then I'm
happy to cast my vote

On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax  wrote:

> Thanks Sophie! Makes it much clearer where you are coming from.
>
> About the Type unsafety: isn't this also an issue for the
> `handleSerialziationException` case, because the handler is used for all
> sink topics, and thus key/value types are not really know w/o taking the
> sink topic into account? -- So I am not sure if having two handler
> methods really helps much with regard to type safety?
>
> Just want to make this small comment for completeness. Let's hear what
> others think. Given that we both don't have a strong opinion but just a
> personal preference, we should be able to come to a conclusion quickly
> and get this KIP approved for 3.8 :)
>
>
> -Matthias
>
> On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:
> > Well I definitely don't feel super strongly about it, and more
> importantly,
> > I'm not a user. So I will happily defer to the preference of anyone who
> > will actually be using this feature. But  I'll explain my reasoning:
> >
> > There *is* a relevant distinction between these two callbacks -- because
> > the passed-in record will have a different type depending on whether it
> was
> > a serialization exception or something else. Even if we combined them
> into
> > a single #handle method, users will still end up implementing two
> distinct
> > branches depending on whether it was a serialization exception or not,
> > since that determines the type of the ProducerRecord passed in.
> >
> > Not to mention they'll need to cast it to a ProducerRecord byte[]>
> > when we could have just passed it in as this type via a dedicated
> callback.
> > And note that because of the generics, they can't do an instanceof check
> to
> > make sure that the record type is ProducerRecord and will
> > have to suppress the "unchecked cast" warning.
> >
> > So if we combined the two callbacks, their handler will look something
> like
> > this:
> >
> > @SuppressWarnings("unchecked")
> > public ProductionExceptionHandlerResponse handle(final
> ErrorHandlerContext
> > context,
> > final ProducerRecord record,
> > final Exception exception) {
> > if (exception instanceof SerializationException) {
> > if (exception.origin().equals(KEY)) {
> > log.error("Failed to serialize key", exception);
> > } else {
> > log.error("Failed to serialize value", exception);
> > }
> >
> > } else {
> > final ProducerRecord serializedRecord =
> (ProducerRecord > byte[]>) record;
> > log.error("Failed to produce record with serialized key={} and serialized
> > value={}",
> > serializedRecord.key(), serializedRecord.value());
> > }
> > return ProductionExceptionHandlerResponse.FAIL;
> > }
> >
> > That seems like the most basic case, and it still haswith distinct logic
> > even if they ultimately handle exceptions the same way. And looking
> forward
> > to KIP-1034: Dead-letter queues, it seems all the more likely that the
> > actual handling response might be different depending on whether it's a
> > serialization exception or not: a serialized record can probably be
> > retried/sent to a DLQ, whereas a record that can't be serialized should
> not
> > (can't, really) be forwarded to a DLQ. So if they're going to have
> > completely different implementations depending on whether it's a
> > serialization exception, why not just give them two separate callbacks?
> >
> > And that's all assuming the user is perfectly aware of the different
> > exception types and their implications for the type of the
> ProducerRecord.
> > Many people might just miss the existence of the
> > RecordSerializationException altogether --
> > there are already so many different exception types, ESPECIALLY when it
> > comes to the Producer. Not to mention they'll need to understand the
> > nuances of how the ProducerRecord type changes depending on the type of
> > exception that's passed in. And on top of all that, they'll need to know
> > that there is metadata stored in the 

Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.7 #153

2024-05-09 Thread Apache Jenkins Server
See 




[DISCUSS] KIP-891: Running multiple versions of Connector plugins

2024-05-09 Thread Greg Harris
Hi all,

I'd like to reboot the discussion on KIP-891:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+Connector+plugins

I've made some changes, most notably:

1. Specifying versions for all plugins in Connector configs
(converters, header converters, transforms, and predicates) not just
connectors & tasks
2. Specifying a range of versions instead of an exact match
3. New metrics to observe what versions are in-use

Thanks to Snehashis for the original KIP idea!

Thanks,
Greg

On Tue, Jan 2, 2024 at 11:49 AM Greg Harris  wrote:
>
> Hi Snehashis,
>
> Thank you for the KIP! This is something I've wanted for a long time.
>
> I know the discussion has gone cold, are you still interested in
> pursuing this feature? I'll make time to review the KIP if you are
> still accepting comments.
>
> Thanks,
> Greg
>
> On Tue, Nov 22, 2022 at 12:29 PM Snehashis  wrote:
> >
> > Thanks for the points Sagar.
> >
> > > 1) Should we update the GET /connectors endpoint to include the version of
> > > the plugin that is running? It could be useful to figure out the version
> > of
> > > the plugin or I am assuming it gets returned by the expand=info call?
> >
> > I think this is good to have and possible future enhancement. The version
> > info will be present in the config of the connector if the user has
> > specified the version. Otherwise it is the latest version which the user
> > can find out from the connector-plugin endpoint. The information can be
> > introduced to the response of the GET /connectors endpoint itself, however
> > the most ideal way of doing this would be to get the currently running
> > instance of the connector and get the version directly from there. This is
> > slightly tricky as the connector could be running in a different node.
> > One way to do this would be to persist the version information in the
> > status backing store during instantiation of the connector. It requires
> > some more thought and since the version is part of the configs if provided
> > and evident otherwise, I have not included it in this KIP.
> >
> > > 2) I am not aware of this and hence asking, can 2 connectors with
> > different
> > > versions have the same name? Does the plugin isolation allow this? This
> > > could have a bearing when using the lifecycle endpoints for connectors
> > like
> > > DELETE etc.
> >
> > All connectors in a cluster need to have uniquire connector names
> > regardless of what version of the plugin the connector is running
> > underneath. This is something enforced by the connect runtime itself. All
> > connect CRUD operations are keyed on the connector name so there will not
> > be an issue.
> >
> > Regards
> > Snehashis
> >
> > On Tue, Nov 22, 2022 at 3:16 PM Sagar  wrote:
> >
> > > Hey Snehashsih,
> > >
> > > Thanks for the KIP. It looks like a very useful feature. Couple of
> > > small-ish points, let me know what you think:
> > >
> > > 1) Should we update the GET /connectors endpoint to include the version of
> > > the plugin that is running? It could be useful to figure out the version 
> > > of
> > > the plugin or I am assuming it gets returned by the expand=info call?
> > > 2) I am not aware of this and hence asking, can 2 connectors with 
> > > different
> > > versions have the same name? Does the plugin isolation allow this? This
> > > could have a bearing when using the lifecycle endpoints for connectors 
> > > like
> > > DELETE etc.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Tue, Nov 22, 2022 at 2:10 PM Ashwin 
> > > wrote:
> > >
> > > > Hi Snehasis,
> > > >
> > > > > IIUC (please correct me if I am wrong here), what you highlighted
> > > above,
> > > > is
> > > > a versioning scheme for a connector config for the same connector (and
> > > not
> > > > different versions of a connector plugin).
> > > >
> > > > Sorry for not being more precise in my wording -  I meant registering
> > > > versions of schema for connector config.
> > > >
> > > > Let's take the example of a fictional connector which uses a fictional
> > > AWS
> > > > service.
> > > >
> > > > Fictional Connector Config schema version:2.0
> > > > ---
> > > > {
> > > >   "$schema": "http://json-schema.org/draft-04/schema#;,
> > > >   "type": "object",
> > > >   "properties": {
> > > > "name": {
> > > >   "type": "string"
> > > > },
> > > > "schema_version": {
> > > >   "type": "string"
> > > > },
> > > > "aws_access_key": {
> > > >   "type": "string"
> > > > },
> > > > "aws_secret_key": {
> > > >   "type": "string"
> > > > }
> > > >   },
> > > >   "required": [
> > > > "name",
> > > > "schema_version",
> > > > "aws_access_key",
> > > > "aws_secret_key"
> > > >   ]
> > > > }
> > > >
> > > > Fictional Connector config schema version:3.0
> > > > ---
> > > > {
> > > >   "$schema": "http://json-schema.org/draft-04/schema#;,
> > > >   "type": "object",
> > > >   "properties": {
> > > > "name": {
> > > >   

[jira] [Created] (KAFKA-16696) Remove the in-memory implementation of RSM and RLMM

2024-05-09 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16696:


 Summary: Remove the in-memory implementation of RSM and RLMM
 Key: KAFKA-16696
 URL: https://issues.apache.org/jira/browse/KAFKA-16696
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


The in-memory implementation of RSM and RLMM were written to write the 
unit/integration tests: [https://github.com/apache/kafka/pull/10218]

This is not used by any of the tests and superseded by the LocalTieredStorage 
framework which uses local-disk as secondary storage and topic as RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Creating kafka wiki id

2024-05-09 Thread Matthias J. Sax
Self-service to create an account is currently not working. Please reply 
on https://issues.apache.org/jira/browse/INFRA-25451 to request a wiki 
account.


I'll update the wiki page for now until the issue is resolved.

-Matthias

On 5/7/24 8:25 AM, 黃竣陽 wrote:

Hello, I want to create a KIP, but I don't have Kafka wiki id. I go to the
page (https://cwiki.apache.org/confluence/signup.action) but it doesn't
have a button to regist an account
Please help me to create an account, Thank you



Re: [VOTE] KIP-1018: Introduce max remote fetch timeout config

2024-05-09 Thread Satish Duggana
Thanks Kamal for the KIP.

+1 from me.

~Satish.

On Thu, 9 May 2024 at 17:52, Christo Lolov  wrote:
>
> Heya Kamal,
>
> Thanks for the KIP and the answers in the discussion!
>
> +1 from me :)
>
> Best,
> Christo
>
> On Thu, 9 May 2024 at 11:11, Federico Valeri  wrote:
>
> > +1 non binding
> >
> > Thanks
> >
> > On Thu, May 9, 2024 at 12:05 PM Luke Chen  wrote:
> > >
> > > Hi Kamal,
> > >
> > > Thanks for the KIP!
> > > +1 from me.
> > >
> > > Thanks.
> > > Luke
> > >
> > > On Mon, May 6, 2024 at 5:03 PM Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > We would like to start a voting thread for KIP-1018: Introduce
> > > > max remote fetch timeout config for DelayedRemoteFetch requests.
> > > >
> > > > The KIP is available on
> > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests
> > > >
> > > > If you have any suggestions, feel free to participate in the discussion
> > > > thread:
> > > > https://lists.apache.org/thread/9x21hzpxzmrt7xo4vozl17d70fkg3chk
> > > >
> > > > --
> > > > Kamal
> > > >
> >


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-09 Thread Matthias J. Sax

Thanks Sophie! Makes it much clearer where you are coming from.

About the Type unsafety: isn't this also an issue for the 
`handleSerialziationException` case, because the handler is used for all 
sink topics, and thus key/value types are not really know w/o taking the 
sink topic into account? -- So I am not sure if having two handler 
methods really helps much with regard to type safety?


Just want to make this small comment for completeness. Let's hear what 
others think. Given that we both don't have a strong opinion but just a 
personal preference, we should be able to come to a conclusion quickly 
and get this KIP approved for 3.8 :)



-Matthias

On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:

Well I definitely don't feel super strongly about it, and more importantly,
I'm not a user. So I will happily defer to the preference of anyone who
will actually be using this feature. But  I'll explain my reasoning:

There *is* a relevant distinction between these two callbacks -- because
the passed-in record will have a different type depending on whether it was
a serialization exception or something else. Even if we combined them into
a single #handle method, users will still end up implementing two distinct
branches depending on whether it was a serialization exception or not,
since that determines the type of the ProducerRecord passed in.

Not to mention they'll need to cast it to a ProducerRecord
when we could have just passed it in as this type via a dedicated callback.
And note that because of the generics, they can't do an instanceof check to
make sure that the record type is ProducerRecord and will
have to suppress the "unchecked cast" warning.

So if we combined the two callbacks, their handler will look something like
this:

@SuppressWarnings("unchecked")
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
final ProducerRecord record,
final Exception exception) {
if (exception instanceof SerializationException) {
if (exception.origin().equals(KEY)) {
log.error("Failed to serialize key", exception);
} else {
log.error("Failed to serialize value", exception);
}

} else {
final ProducerRecord serializedRecord = (ProducerRecord) record;
log.error("Failed to produce record with serialized key={} and serialized
value={}",
serializedRecord.key(), serializedRecord.value());
}
return ProductionExceptionHandlerResponse.FAIL;
}

That seems like the most basic case, and it still haswith distinct logic
even if they ultimately handle exceptions the same way. And looking forward
to KIP-1034: Dead-letter queues, it seems all the more likely that the
actual handling response might be different depending on whether it's a
serialization exception or not: a serialized record can probably be
retried/sent to a DLQ, whereas a record that can't be serialized should not
(can't, really) be forwarded to a DLQ. So if they're going to have
completely different implementations depending on whether it's a
serialization exception, why not just give them two separate callbacks?

And that's all assuming the user is perfectly aware of the different
exception types and their implications for the type of the ProducerRecord.
Many people might just miss the existence of the
RecordSerializationException altogether --
there are already so many different exception types, ESPECIALLY when it
comes to the Producer. Not to mention they'll need to understand the
nuances of how the ProducerRecord type changes depending on the type of
exception that's passed in. And on top of all that, they'll need to know
that there is metadata stored in the RecordSerializationException regarding
the origin of the error. Whereas if we just passed in the
SerializationExceptionOrigin to a #handlerSerialization callback, well,
that's pretty impossible to miss.

That all just seems like a lot for most people to have to understand to
implement a ProductionExceptionHandler, which imo is not at all an advanced
feature and should be as straightforward and easy to use as possible.

Lastly -- I don't think it's quite fair to compare this to the
RecordDeserializationException. We have a dedicated handler that's just for
deserialization exceptions specifically, hence there's no worry about users
having to be aware of the different exception types they might have to deal
with in the DeserializtionExceptionHandler. Whereas serialization
exceptions are just a subset of what might get passed in to the
ProductionExceptionHandler...

Just explaining my reasoning -- in the end I leave it up to the KIP authors
and anyone who will actually be using this feature in their applications :)



On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax  wrote:


@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is 

Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-05-09 Thread Kamal Chandraprakash
Christo,

IMO, we can converge the implementation for both the policies
and remove the remote-log-segments in async fashion.

The only difference between the "delete" and "retain" policies is
that, for "delete" we move the log-start-offset to match with the
local-log-start-offset and then the expiration threads will clean
those remote-log segments due to breach by log-start-offset.
The proposal is that we use the same remote-log-expiration threads
to delete the remote-log-segments instead of the request-handler-thread
as the time taken to delete the remote log segments is non-deterministic.

For both the policies, we have to stop the copier threads and close
the RLM task once all the segments have expired from the remote
storage.

https://github.com/apache/kafka/pull/13947/files#r1281675818

--
Kamal

On Thu, May 9, 2024 at 6:51 PM Christo Lolov  wrote:

> Heya!
>
> re: Luke
>
> 1. I am not certain I follow the question. From DISABLED you can only go to
> ENABLED regardless of whether your cluster is backed by Zookeeper or KRaft.
> Am I misunderstanding your point?
>
> 2. Apologies, this was a leftover from previous versions. I have updated
> the Zookeeper section. The steps ought to be: controller receives change,
> commits necessary data to Zookeeper, enqueues disablement and starts
> sending StopReplicas request to brokers; brokers receive StopReplicas and
> propagate them all the way to RemoteLogManager#stopPartitions which takes
> care of the rest.
>
> 3. Correct, it should say DISABLED - this should now be corrected.
>
> 4. I was thinking that if there is a mismatch we will just fail accepting
> the request for disablement. This should be the same in both Zookeeper and
> KRaft. Or am I misunderstanding your question?
>
> 5. Yeah. I am now doing a second pass on all diagrams and will update them
> by the end of the day!
>
> 6. I think my current train of thought is that there will be unlimited
> retries until all brokers respond in a similar way to how deletion of a
> topic works today in ZK. In the meantime the state will continue to be
> DISABLING. Do you have a better suggestion?
>
> re: Kamal
>
> Yep, I will update all diagrams
>
> I am not certain I follow the reasoning for making retain and delete the
> same. Deletion when the policy is retain happens asynchronously due to
> expiration. I think that deletion when the policy is delete ought to (at
> least for the initial implementation) happen synchronously. Should people
> run into timeout problems we can always then have a follow-up KIP where we
> make it asynchronous.
>
> Best,
> Christo
>
> On Tue, 7 May 2024 at 10:04, Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Hi Christo,
> >
> > Thanks for the update!
> >
> > For both the policies "retain" and "delete", can we maintain the same
> > approach to delete the segments async?
> >
> > > If the disablement policy is set to delete, the Log start offset (LSO)
> is
> > updated to match the Local Log Start Offset and the remote log is deleted
> > by calling the RemoteStorageManager#deleteLogSegmentData().
> >
> > In the KIP, it's mentioned that when the disable policy is set to
> "delete",
> > the remote-log-segments will be
> > deleted in-sync. The stopPartition call might get timed out when the
> number
> > of remote log segments to
> > delete is huge. We can further extend the same approach for the topic
> > deletion requests.
> >
> > Also, Could you please update the state diagram about the transitions? It
> > is not clear when to transit from
> > DISABLING to DISABLED state?
> >
> > --
> > Kamal
> >
> > On Mon, May 6, 2024 at 6:55 PM Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> > > Ignore the above message. Got the answers after reading the state
> > > transition section.
> > >
> > > > If the disablement policy is delete, tasks scheduled for the
> > > topic-partitions in the RemoteDataExpirationThreadPool will also be
> > > canceled.
> > >
> > > We are deleting the segments synchronously. Should we delete them
> > > asynchronously? The same approach can be extended to topic deletion
> > > requests.
> > >
> > > > 6. In ZK mode, what will the controller do if the "stopReplicas"
> > > responses not received from all brokers? Reverting the changes?
> > >
> > > Since we are deleting the segments synchronously. This case can be
> bound
> > > to happen when the number of remote log segments to
> > > delete is huge.
> > >
> > >
> > > On Mon, May 6, 2024, 18:12 Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > >> Hi Christo,
> > >>
> > >> Thanks for the update!
> > >>
> > >> 1. In the ZK mode, how will the transition from DISABLING to DISABLED
> > >> state happen?
> > >> For the "retain" policy, until we delete all the remote-log segments,
> > the
> > >> state will be
> > >> DISABLING and the deletion can happen only when they breach either the
> > >> retention
> > >> time (or) size.
> > >>
> > >> How does the controller 

[jira] [Created] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16695:
--

 Summary: Improve expired poll interval logging by showing exceeded 
time
 Key: KAFKA-16695
 URL: https://issues.apache.org/jira/browse/KAFKA-16695
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


When a consumer poll iteration takes longer than the max.poll.interval, the 
consumer logs a warn suggesting that the max.poll.interval config was exceeded, 
and pro-actively leaves the group. The log suggests to consider adjusting the 
max.poll.interval.config which should help in the cases of long processing 
times. We should consider adding the info of how much time the interval was 
exceeded, since it could be helpful in guiding the user to effectively adjust 
the config. This is done in other clients, that log this kind of messages in 
this situation:
{quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
max.poll.interval.ms for long-running message processing): leaving group{quote}
  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: KIP-1040: Improve handling of nullable values in InsertField/ExtractField transformations

2024-05-09 Thread Chris Egerton
After doing a brief survey of the SMTs that ship with Connect, it seems
like these would also benefit:

- HeaderFrom, which populates record headers with subfields of keys/values
[1]
- Cast, which performs type transformation on subfields of keys/values [2]
- SetSchemaMetadata, which (when the record key/value is a struct) copies
fields from the input struct to the output struct (which uses a different
schema) [3]
- TimestampConverter, which does similar input/output field copying to
SetSchemaMetadata [4]
- ReplaceField, which does similar input/output field copying to
SetSchemaMetadata and TimestampConverter

[1] -
https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java#L143
[2] -
https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L178
[3] -
https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L174
[4] -
https://github.com/apache/kafka/blob/2a5efe4a334611fc7c15f76b322482bad0942db0/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java#L420
[5] -
https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java#L183

On Thu, May 9, 2024 at 3:28 AM Mario Fiore Vitale 
wrote:

> Hi Chris,
>
> > Wouldn't ValueToKey [1] be applicable as well, for example?
> Yes, also that one can be affected.
>
> On Wed, May 8, 2024 at 5:59 PM Chris Egerton 
> wrote:
>
> > Wait, just one more thing--are there any other SMTs that could benefit
> from
> > this? Wouldn't ValueToKey [1] be applicable as well, for example?
> >
> > [1] -
> >
> >
> https://github.com/apache/kafka/blob/2a5efe4a334611fc7c15f76b322482bad0942db0/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java#L106
> >
> > On Wed, May 8, 2024 at 11:46 AM Chris Egerton  wrote:
> >
> > > Hi Mario,
> > >
> > > I think we could have something like `copy` and `copyWithoutDefaults`
> to
> > > get around that, but now that you bring up compatibility, I think it's
> > best
> > > to hold off on this. I'm forced to recall that anything we add to the
> > > Connect API may be used by plugin developers who write for the bleeding
> > > edge of the Connect runtime, but deployed by users who are running on
> > > (possibly much) older versions. In that scenario, any use of new Struct
> > > methods would cause issues at runtime caused by compatibility clashes
> > > between the newer API that the plugin was written for, and the older
> API
> > > that's provided by the runtime it's running on.
> > >
> > > Anyway, thanks for humoring me. The KIP looks good to me 
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, May 8, 2024 at 10:50 AM Mario Fiore Vitale  >
> > > wrote:
> > >
> > >> Hi Chris,
> > >>
> > >> Thanks for reviewing this.
> > >>
> > >> > It seems like the pattern of "copy the contents of this Struct into
> > >> another
> > >> one for the purpose of mutation" could be fairly common in user code
> > bases
> > >> in addition to the core Connect SMTs. Do you think there's a way to
> > >> simplify this with, e.g., a Struct.putAll(Struct destination) or
> > >> Struct.copy(Schema destinationSchema) method?
> > >>
> > >> The only concern that I see is backward compatibility. Suppose that
> you
> > >> are
> > >> not using the JsonConvert but another convert that does't support the
> > >> 'replace.null.with.default', when you use the current 'InsertField'
> smt
> > >> the null values will be replace by default values. If we replace the
> > >> "copy"
> > >> logic with a method in the Struct we remove this behavior.
> > >>
> > >> Isn't it?
> > >>
> > >> Mario.
> > >>
> > >> On Wed, May 8, 2024 at 2:14 PM Chris Egerton  >
> > >> wrote:
> > >>
> > >> > Hi Mario,
> > >> >
> > >> > Thanks for the KIP! Looks good overall. One quick thought--it wasn't
> > >> > immediately obvious to me why we had to touch on InsertField for
> this.
> > >> It
> > >> > looks like the reason is that we use Struct::get [1] to create a
> clone
> > >> of
> > >> > the input value [2], instead of Struct::getWithoutDefault [3].
> > >> >
> > >> > It seems like the pattern of "copy the contents of this Struct into
> > >> another
> > >> > one for the purpose of mutation" could be fairly common in user code
> > >> bases
> > >> > in addition to the core Connect SMTs. Do you think there's a way to
> > >> > simplify this with, e.g., a Struct.putAll(Struct destination) or
> > >> > Struct.copy(Schema destinationSchema) method?
> > >> >
> > >> > [1] -
> > >> >
> > >> >
> > >>
> >
> 

Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-05-09 Thread Christo Lolov
Heya!

re: Luke

1. I am not certain I follow the question. From DISABLED you can only go to
ENABLED regardless of whether your cluster is backed by Zookeeper or KRaft.
Am I misunderstanding your point?

2. Apologies, this was a leftover from previous versions. I have updated
the Zookeeper section. The steps ought to be: controller receives change,
commits necessary data to Zookeeper, enqueues disablement and starts
sending StopReplicas request to brokers; brokers receive StopReplicas and
propagate them all the way to RemoteLogManager#stopPartitions which takes
care of the rest.

3. Correct, it should say DISABLED - this should now be corrected.

4. I was thinking that if there is a mismatch we will just fail accepting
the request for disablement. This should be the same in both Zookeeper and
KRaft. Or am I misunderstanding your question?

5. Yeah. I am now doing a second pass on all diagrams and will update them
by the end of the day!

6. I think my current train of thought is that there will be unlimited
retries until all brokers respond in a similar way to how deletion of a
topic works today in ZK. In the meantime the state will continue to be
DISABLING. Do you have a better suggestion?

re: Kamal

Yep, I will update all diagrams

I am not certain I follow the reasoning for making retain and delete the
same. Deletion when the policy is retain happens asynchronously due to
expiration. I think that deletion when the policy is delete ought to (at
least for the initial implementation) happen synchronously. Should people
run into timeout problems we can always then have a follow-up KIP where we
make it asynchronous.

Best,
Christo

On Tue, 7 May 2024 at 10:04, Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi Christo,
>
> Thanks for the update!
>
> For both the policies "retain" and "delete", can we maintain the same
> approach to delete the segments async?
>
> > If the disablement policy is set to delete, the Log start offset (LSO) is
> updated to match the Local Log Start Offset and the remote log is deleted
> by calling the RemoteStorageManager#deleteLogSegmentData().
>
> In the KIP, it's mentioned that when the disable policy is set to "delete",
> the remote-log-segments will be
> deleted in-sync. The stopPartition call might get timed out when the number
> of remote log segments to
> delete is huge. We can further extend the same approach for the topic
> deletion requests.
>
> Also, Could you please update the state diagram about the transitions? It
> is not clear when to transit from
> DISABLING to DISABLED state?
>
> --
> Kamal
>
> On Mon, May 6, 2024 at 6:55 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Ignore the above message. Got the answers after reading the state
> > transition section.
> >
> > > If the disablement policy is delete, tasks scheduled for the
> > topic-partitions in the RemoteDataExpirationThreadPool will also be
> > canceled.
> >
> > We are deleting the segments synchronously. Should we delete them
> > asynchronously? The same approach can be extended to topic deletion
> > requests.
> >
> > > 6. In ZK mode, what will the controller do if the "stopReplicas"
> > responses not received from all brokers? Reverting the changes?
> >
> > Since we are deleting the segments synchronously. This case can be bound
> > to happen when the number of remote log segments to
> > delete is huge.
> >
> >
> > On Mon, May 6, 2024, 18:12 Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> >> Hi Christo,
> >>
> >> Thanks for the update!
> >>
> >> 1. In the ZK mode, how will the transition from DISABLING to DISABLED
> >> state happen?
> >> For the "retain" policy, until we delete all the remote-log segments,
> the
> >> state will be
> >> DISABLING and the deletion can happen only when they breach either the
> >> retention
> >> time (or) size.
> >>
> >> How does the controller monitor that all the remote log segments are
> >> deleted for all
> >> the partitions of the topic before transitioning the state to DISABLED?
> >>
> >> 2. In Kraft, we have only ENABLED -> DISABLED state. How are we
> >> supporting the case
> >> "retain" -> "enable"?
> >>
> >> If the remote storage is degraded, we want to avoid uploading the
> >> segments temporarily
> >> and resume back once the remote storage is healthy. Is the case
> supported?
> >>
> >>
> >>
> >> On Fri, May 3, 2024 at 12:12 PM Luke Chen  wrote:
> >>
> >>> Also, I think using `stopReplicas` request is a good idea because it
> >>> won't cause any problems while migrating to KRaft mode.
> >>> The stopReplicas request is one of the request that KRaft controller
> >>> will send to ZK brokers during migration.
> >>>
> >>> Thanks.
> >>> Luke
> >>>
> >>> On Fri, May 3, 2024 at 11:48 AM Luke Chen  wrote:
> >>>
>  Hi Christo,
> 
>  Thanks for the update.
> 
>  Questions:
>  1. For this
>  "The possible state transition from DISABLED state is to the ENABLED."
>  I think it 

[jira] [Resolved] (KAFKA-16676) Security docs missing RPCs from KIP-714 and KIP-1000

2024-05-09 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-16676.
--
Resolution: Fixed

> Security docs missing RPCs from KIP-714 and KIP-1000
> 
>
> Key: KAFKA-16676
> URL: https://issues.apache.org/jira/browse/KAFKA-16676
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Affects Versions: 3.7.0, 3.8.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Minor
> Fix For: 3.8.0
>
>
> KIPs 714 and 1000 introduced 3 new RPCs to do with client metrics. None of 
> them was added to the list of RPCs in the security documentation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-1018: Introduce max remote fetch timeout config

2024-05-09 Thread Christo Lolov
Heya Kamal,

Thanks for the KIP and the answers in the discussion!

+1 from me :)

Best,
Christo

On Thu, 9 May 2024 at 11:11, Federico Valeri  wrote:

> +1 non binding
>
> Thanks
>
> On Thu, May 9, 2024 at 12:05 PM Luke Chen  wrote:
> >
> > Hi Kamal,
> >
> > Thanks for the KIP!
> > +1 from me.
> >
> > Thanks.
> > Luke
> >
> > On Mon, May 6, 2024 at 5:03 PM Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > We would like to start a voting thread for KIP-1018: Introduce
> > > max remote fetch timeout config for DelayedRemoteFetch requests.
> > >
> > > The KIP is available on
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests
> > >
> > > If you have any suggestions, feel free to participate in the discussion
> > > thread:
> > > https://lists.apache.org/thread/9x21hzpxzmrt7xo4vozl17d70fkg3chk
> > >
> > > --
> > > Kamal
> > >
>


Re: [VOTE] KIP-1018: Introduce max remote fetch timeout config

2024-05-09 Thread Federico Valeri
+1 non binding

Thanks

On Thu, May 9, 2024 at 12:05 PM Luke Chen  wrote:
>
> Hi Kamal,
>
> Thanks for the KIP!
> +1 from me.
>
> Thanks.
> Luke
>
> On Mon, May 6, 2024 at 5:03 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Hi all,
> >
> > We would like to start a voting thread for KIP-1018: Introduce
> > max remote fetch timeout config for DelayedRemoteFetch requests.
> >
> > The KIP is available on
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests
> >
> > If you have any suggestions, feel free to participate in the discussion
> > thread:
> > https://lists.apache.org/thread/9x21hzpxzmrt7xo4vozl17d70fkg3chk
> >
> > --
> > Kamal
> >


Re: [VOTE] KIP-1018: Introduce max remote fetch timeout config

2024-05-09 Thread Luke Chen
Hi Kamal,

Thanks for the KIP!
+1 from me.

Thanks.
Luke

On Mon, May 6, 2024 at 5:03 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi all,
>
> We would like to start a voting thread for KIP-1018: Introduce
> max remote fetch timeout config for DelayedRemoteFetch requests.
>
> The KIP is available on
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests
>
> If you have any suggestions, feel free to participate in the discussion
> thread:
> https://lists.apache.org/thread/9x21hzpxzmrt7xo4vozl17d70fkg3chk
>
> --
> Kamal
>


Re: KIP-1040: Improve handling of nullable values in InsertField/ExtractField transformations

2024-05-09 Thread Mario Fiore Vitale
Hi Chris,

> Wouldn't ValueToKey [1] be applicable as well, for example?
Yes, also that one can be affected.

On Wed, May 8, 2024 at 5:59 PM Chris Egerton 
wrote:

> Wait, just one more thing--are there any other SMTs that could benefit from
> this? Wouldn't ValueToKey [1] be applicable as well, for example?
>
> [1] -
>
> https://github.com/apache/kafka/blob/2a5efe4a334611fc7c15f76b322482bad0942db0/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java#L106
>
> On Wed, May 8, 2024 at 11:46 AM Chris Egerton  wrote:
>
> > Hi Mario,
> >
> > I think we could have something like `copy` and `copyWithoutDefaults` to
> > get around that, but now that you bring up compatibility, I think it's
> best
> > to hold off on this. I'm forced to recall that anything we add to the
> > Connect API may be used by plugin developers who write for the bleeding
> > edge of the Connect runtime, but deployed by users who are running on
> > (possibly much) older versions. In that scenario, any use of new Struct
> > methods would cause issues at runtime caused by compatibility clashes
> > between the newer API that the plugin was written for, and the older API
> > that's provided by the runtime it's running on.
> >
> > Anyway, thanks for humoring me. The KIP looks good to me 
> >
> > Cheers,
> >
> > Chris
> >
> > On Wed, May 8, 2024 at 10:50 AM Mario Fiore Vitale 
> > wrote:
> >
> >> Hi Chris,
> >>
> >> Thanks for reviewing this.
> >>
> >> > It seems like the pattern of "copy the contents of this Struct into
> >> another
> >> one for the purpose of mutation" could be fairly common in user code
> bases
> >> in addition to the core Connect SMTs. Do you think there's a way to
> >> simplify this with, e.g., a Struct.putAll(Struct destination) or
> >> Struct.copy(Schema destinationSchema) method?
> >>
> >> The only concern that I see is backward compatibility. Suppose that you
> >> are
> >> not using the JsonConvert but another convert that does't support the
> >> 'replace.null.with.default', when you use the current 'InsertField' smt
> >> the null values will be replace by default values. If we replace the
> >> "copy"
> >> logic with a method in the Struct we remove this behavior.
> >>
> >> Isn't it?
> >>
> >> Mario.
> >>
> >> On Wed, May 8, 2024 at 2:14 PM Chris Egerton 
> >> wrote:
> >>
> >> > Hi Mario,
> >> >
> >> > Thanks for the KIP! Looks good overall. One quick thought--it wasn't
> >> > immediately obvious to me why we had to touch on InsertField for this.
> >> It
> >> > looks like the reason is that we use Struct::get [1] to create a clone
> >> of
> >> > the input value [2], instead of Struct::getWithoutDefault [3].
> >> >
> >> > It seems like the pattern of "copy the contents of this Struct into
> >> another
> >> > one for the purpose of mutation" could be fairly common in user code
> >> bases
> >> > in addition to the core Connect SMTs. Do you think there's a way to
> >> > simplify this with, e.g., a Struct.putAll(Struct destination) or
> >> > Struct.copy(Schema destinationSchema) method?
> >> >
> >> > [1] -
> >> >
> >> >
> >>
> https://github.com/apache/kafka/blob/f74f596bc7d35fcea97edcd83244e5d6aee308d2/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java#L78-L91
> >> > [2] -
> >> >
> >> >
> >>
> https://github.com/apache/kafka/blob/f74f596bc7d35fcea97edcd83244e5d6aee308d2/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L179-L183
> >> > [3] -
> >> >
> >> >
> >>
> https://github.com/apache/kafka/blob/f74f596bc7d35fcea97edcd83244e5d6aee308d2/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java#L93-L101
> >> >
> >> > Cheers,
> >> >
> >> > Chris
> >> >
> >> > On Wed, May 8, 2024 at 3:40 AM Mario Fiore Vitale  >
> >> > wrote:
> >> >
> >> > > Hi All,
> >> > >
> >> > > I have created (through Mickael Maison's account since there was an
> >> issue
> >> > > creating a new one for me) KIP-1040[1] to improve handling of
> nullable
> >> > > values in InsertField/ExtractField transformations, this is required
> >> > after
> >> > > the introduction of KIP-581[2] that introduced the configuration
> >> property
> >> > > *replace.null.with.default* to *JsonConverter* to choose whether to
> >> > replace
> >> > > fields that have a default value and that are null to the default
> >> value.
> >> > >
> >> > > [1]
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677
> >> > > [2]
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value
> >> > >
> >> > > Feedback and suggestions are welcome.
> >> > >
> >> > > Regards,
> >> > > Mario.
> >> > > --
> >> > >
> >> > > Mario Fiore Vitale
> >> > >
> >> > > Senior Software Engineer
> >> > >
> >> > > Red Hat 
> >> > > 
> >> > >
> >> >
> >>
> >>
> >> --
> >>
> >> Mario Fiore Vitale
> >>
> >> Senior Software Engineer
> >>
> >> Red 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2886

2024-05-09 Thread Apache Jenkins Server
See