[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849905#comment-17849905
 ] 

Ganesh Sadanala commented on KAFKA-15203:
-

[~chia7712] It totally makes sense. Thank you!

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16448: Add ProcessingExceptionHandler interface and implementations [kafka]

2024-05-28 Thread via GitHub


cadonna commented on PR #16090:
URL: https://github.com/apache/kafka/pull/16090#issuecomment-2134483730

   Closing and reopening the PR since there were issues with starting the 
builds on Apache Infrastructure. See 
https://issues.apache.org/jira/browse/INFRA-25824 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16814:
--
Description: 
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR Encountered fatal fault: Error starting LogManager 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.RuntimeException: The log dir 
Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=0) does not have a topic ID, which is not allowed when running in 
KRaft mode.
    at 
kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
    at scala.Option.getOrElse(Option.scala:201)
    at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
    at kafka.log.LogManager.loadLog(LogManager.scala:359)
    at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1623) {code}
 

Because if we don't do the isStrayKraftReplica check, the topicID and the 
`partition.metadata` will get recovered after getting topic partition update 
and becoming leader or follower later. I'm proposing we skip the 
`isStrayKraftReplica` check if topicID is None, instead of throwing exception 
to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
only, it should be fine IMO.

 

 

=== update ===

Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
delete them is because the replica should be deleted, but left in the log dir. 
So, if we have a replica that doesn't have topicID (due to

 

 

  was:
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR Encountered fatal fault: Error starting LogManager 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.RuntimeException: The log dir 
Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=0) does not have a topic ID, which is not allowed when running in 
KRaft mode.
    at 
kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
    at scala.Option.getOrElse(Option.scala:201)
    at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
    at kafka.log.LogManager.loadLog(LogManager.scala:359)
    at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
    at 
java.b

Re: [PR] KAFKA-16448: Add ProcessingExceptionHandler interface and implementations [kafka]

2024-05-28 Thread via GitHub


cadonna closed pull request #16090: KAFKA-16448: Add ProcessingExceptionHandler 
interface and implementations
URL: https://github.com/apache/kafka/pull/16090


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16814:
--
Description: 
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR Encountered fatal fault: Error starting LogManager 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.RuntimeException: The log dir 
Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=0) does not have a topic ID, which is not allowed when running in 
KRaft mode.
    at 
kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
    at scala.Option.getOrElse(Option.scala:201)
    at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
    at kafka.log.LogManager.loadLog(LogManager.scala:359)
    at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1623) {code}
 

Because if we don't do the isStrayKraftReplica check, the topicID and the 
`partition.metadata` will get recovered after getting topic partition update 
and becoming leader or follower later. I'm proposing we skip the 
`isStrayKraftReplica` check if topicID is None, instead of throwing exception 
to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
only, it should be fine IMO.

 

 

=== update ===

Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
delete them is because the replica should be deleted, but left in the log dir. 
So, if we have a replica that doesn't have topicID (due to `partition.metadata` 
is missing), then we cannot identify if this is a stray replica or not. In this 
case, we can do:
 # Delete it
 # Ignore it

For (1), the impact is, if this is not a stray replica, and the 
replication-factor only has 1, then the data might be moved to another 
"xxx-stray" dir, and the partition becomes empty.

For (2), the impact is, if this is a stray replica and we didn't delete it, it 
might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.

 

 

  was:
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR Encountered fatal fault: Error starting LogManager 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.RuntimeException: The log dir 
Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=0) does not have a topic ID, which is not allowed when running in 
KRaft mode.
    at 
kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
    at scala.Option

[jira] [Updated] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16814:
--
Description: 
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR Encountered fatal fault: Error starting LogManager 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.RuntimeException: The log dir 
Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=0) does not have a topic ID, which is not allowed when running in 
KRaft mode.
    at 
kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
    at scala.Option.getOrElse(Option.scala:201)
    at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
    at kafka.log.LogManager.loadLog(LogManager.scala:359)
    at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1623) {code}
 

Because if we don't do the isStrayKraftReplica check, the topicID and the 
`partition.metadata` will get recovered after getting topic partition update 
and becoming leader or follower later. I'm proposing we skip the 
`isStrayKraftReplica` check if topicID is None, instead of throwing exception 
to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
only, it should be fine IMO.

 

 

=== update ===

Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
delete them is because the replica should be deleted, but left in the log dir. 
So, if we have a replica that doesn't have topicID (due to `partition.metadata` 
is missing), then we cannot identify if this is a stray replica or not. In this 
case, we can do:
 # Delete it
 # Ignore it

For (1), the impact is, if this is not a stray replica, and the 
replication-factor only has 1, then the data might be moved to another 
"xxx-stray" dir, and the partition becomes empty.

For (2), the impact is, if this is a stray replica and we didn't delete it, it 
might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.

As the investigation above, this `partition.metadata` missing issue is mostly 
because the async `partition.metadata` when creating a topic. Then, before any 
data append into log, we must make sure partition metadata file is written to 
the log dir 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L772-L774].
 So, it should be fine if we delete it since the topic should be empty.

In short, when finding a log without topicID, we should treat it as a stray log 
and then delete it.

 

  was:
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/m

[jira] [Updated] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16814:
--
Description: 
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR Encountered fatal fault: Error starting LogManager 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.RuntimeException: The log dir 
Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=0) does not have a topic ID, which is not allowed when running in 
KRaft mode.
    at 
kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
    at scala.Option.getOrElse(Option.scala:201)
    at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
    at kafka.log.LogManager.loadLog(LogManager.scala:359)
    at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1623) {code}
 

Because if we don't do the isStrayKraftReplica check, the topicID and the 
`partition.metadata` will get recovered after getting topic partition update 
and becoming leader or follower later. I'm proposing we skip the 
`isStrayKraftReplica` check if topicID is None, instead of throwing exception 
to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
only, it should be fine IMO.

 

 

=== update ===

Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
delete them is because the replica should be deleted, but left in the log dir. 
So, if we have a replica that doesn't have topicID (due to `partition.metadata` 
is missing), then we cannot identify if this is a stray replica or not. In this 
case, we can do:
 # Delete it
 # Ignore it

For (1), the impact is, if this is not a stray replica, and the 
replication-factor only has 1, then the data might be moved to another 
"xxx-stray" dir, and the partition becomes empty.

For (2), the impact is, if this is a stray replica and we didn't delete it, it 
might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.

As the investigation above, this `partition.metadata` missing issue is mostly 
because the async `partition.metadata` when creating a topic. Later, before any 
data append into log, we must make sure partition metadata file is written to 
the log dir 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L772-L774].
 So, it should be fine if we delete it since the topic should be empty.

In short, when finding a log without topicID, we should treat it as a stray log 
and then delete it.

 

  was:
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/

[jira] [Commented] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849917#comment-17849917
 ] 

Kuan Po Tseng commented on KAFKA-16814:
---

gentle ping [~showuon] , I'm willing to solve this issue. Will file a PR soon, 
many thanks~

> KRaft broker cannot startup when `partition.metadata` is missing
> 
>
> Key: KAFKA-16814
> URL: https://issues.apache.org/jira/browse/KAFKA-16814
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When starting up kafka logManager, we'll check stray replicas to avoid some 
> corner cases. But this check might cause broker unable to startup if 
> `partition.metadata` is missing because when startup kafka, we load log from 
> file, and the topicId of the log is coming from `partition.metadata` file. 
> So, if `partition.metadata` is missing, the topicId will be None, and the 
> `LogManager#isStrayKraftReplica` will fail with no topicID error.
> The `partition.metadata` missing could be some storage failure, or another 
> possible path is unclean shutdown after topic is created in the replica, but 
> before data is flushed into `partition.metadata` file. This is possible 
> because we do the flush in async way 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].
>  
>  
> {code:java}
> ERROR Encountered fatal fault: Error starting LogManager 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> java.lang.RuntimeException: The log dir 
> Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
> partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
> logEndOffset=0) does not have a topic ID, which is not allowed when running 
> in KRaft mode.
>     at 
> kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
>     at scala.Option.getOrElse(Option.scala:201)
>     at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
>     at kafka.log.LogManager.loadLog(LogManager.scala:359)
>     at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>     at java.base/java.lang.Thread.run(Thread.java:1623) {code}
>  
> Because if we don't do the isStrayKraftReplica check, the topicID and the 
> `partition.metadata` will get recovered after getting topic partition update 
> and becoming leader or follower later. I'm proposing we skip the 
> `isStrayKraftReplica` check if topicID is None, instead of throwing exception 
> to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
> only, it should be fine IMO.
>  
>  
> === update ===
> Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
> delete them is because the replica should be deleted, but left in the log 
> dir. So, if we have a replica that doesn't have topicID (due to 
> `partition.metadata` is missing), then we cannot identify if this is a stray 
> replica or not. In this case, we can do:
>  # Delete it
>  # Ignore it
> For (1), the impact is, if this is not a stray replica, and the 
> replication-factor only has 1, then the data might be moved to another 
> "xxx-stray" dir, and the partition becomes empty.
> For (2), the impact is, if this is a stray replica and we didn't delete it, 
> it might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.
> As the investigation above, this `partition.metadata` missing issue is mostly 
> because the async `partition.metadata` when creating a topic. Later, before 
> any data append into log, we must make sure partition metadata file is 
> written to the log dir 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L772-L774].
>  So, it should be fine if we delete it since the topic should be empty.
> In short, when finding a log without topicID, we should treat it as a stray 
> log and then delete it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng reassigned KAFKA-16814:
-

Assignee: Kuan Po Tseng

> KRaft broker cannot startup when `partition.metadata` is missing
> 
>
> Key: KAFKA-16814
> URL: https://issues.apache.org/jira/browse/KAFKA-16814
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> When starting up kafka logManager, we'll check stray replicas to avoid some 
> corner cases. But this check might cause broker unable to startup if 
> `partition.metadata` is missing because when startup kafka, we load log from 
> file, and the topicId of the log is coming from `partition.metadata` file. 
> So, if `partition.metadata` is missing, the topicId will be None, and the 
> `LogManager#isStrayKraftReplica` will fail with no topicID error.
> The `partition.metadata` missing could be some storage failure, or another 
> possible path is unclean shutdown after topic is created in the replica, but 
> before data is flushed into `partition.metadata` file. This is possible 
> because we do the flush in async way 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].
>  
>  
> {code:java}
> ERROR Encountered fatal fault: Error starting LogManager 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> java.lang.RuntimeException: The log dir 
> Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
> partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
> logEndOffset=0) does not have a topic ID, which is not allowed when running 
> in KRaft mode.
>     at 
> kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
>     at scala.Option.getOrElse(Option.scala:201)
>     at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
>     at kafka.log.LogManager.loadLog(LogManager.scala:359)
>     at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>     at java.base/java.lang.Thread.run(Thread.java:1623) {code}
>  
> Because if we don't do the isStrayKraftReplica check, the topicID and the 
> `partition.metadata` will get recovered after getting topic partition update 
> and becoming leader or follower later. I'm proposing we skip the 
> `isStrayKraftReplica` check if topicID is None, instead of throwing exception 
> to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
> only, it should be fine IMO.
>  
>  
> === update ===
> Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
> delete them is because the replica should be deleted, but left in the log 
> dir. So, if we have a replica that doesn't have topicID (due to 
> `partition.metadata` is missing), then we cannot identify if this is a stray 
> replica or not. In this case, we can do:
>  # Delete it
>  # Ignore it
> For (1), the impact is, if this is not a stray replica, and the 
> replication-factor only has 1, then the data might be moved to another 
> "xxx-stray" dir, and the partition becomes empty.
> For (2), the impact is, if this is a stray replica and we didn't delete it, 
> it might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.
> As the investigation above, this `partition.metadata` missing issue is mostly 
> because the async `partition.metadata` when creating a topic. Later, before 
> any data append into log, we must make sure partition metadata file is 
> written to the log dir 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L772-L774].
>  So, it should be fine if we delete it since the topic should be empty.
> In short, when finding a log without topicID, we should treat it as a stray 
> log and then delete it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16814:
--
Fix Version/s: 3.8.0
   3.7.1

> KRaft broker cannot startup when `partition.metadata` is missing
> 
>
> Key: KAFKA-16814
> URL: https://issues.apache.org/jira/browse/KAFKA-16814
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> When starting up kafka logManager, we'll check stray replicas to avoid some 
> corner cases. But this check might cause broker unable to startup if 
> `partition.metadata` is missing because when startup kafka, we load log from 
> file, and the topicId of the log is coming from `partition.metadata` file. 
> So, if `partition.metadata` is missing, the topicId will be None, and the 
> `LogManager#isStrayKraftReplica` will fail with no topicID error.
> The `partition.metadata` missing could be some storage failure, or another 
> possible path is unclean shutdown after topic is created in the replica, but 
> before data is flushed into `partition.metadata` file. This is possible 
> because we do the flush in async way 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].
>  
>  
> {code:java}
> ERROR Encountered fatal fault: Error starting LogManager 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> java.lang.RuntimeException: The log dir 
> Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
> partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
> logEndOffset=0) does not have a topic ID, which is not allowed when running 
> in KRaft mode.
>     at 
> kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
>     at scala.Option.getOrElse(Option.scala:201)
>     at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
>     at kafka.log.LogManager.loadLog(LogManager.scala:359)
>     at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>     at java.base/java.lang.Thread.run(Thread.java:1623) {code}
>  
> Because if we don't do the isStrayKraftReplica check, the topicID and the 
> `partition.metadata` will get recovered after getting topic partition update 
> and becoming leader or follower later. I'm proposing we skip the 
> `isStrayKraftReplica` check if topicID is None, instead of throwing exception 
> to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
> only, it should be fine IMO.
>  
>  
> === update ===
> Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
> delete them is because the replica should be deleted, but left in the log 
> dir. So, if we have a replica that doesn't have topicID (due to 
> `partition.metadata` is missing), then we cannot identify if this is a stray 
> replica or not. In this case, we can do:
>  # Delete it
>  # Ignore it
> For (1), the impact is, if this is not a stray replica, and the 
> replication-factor only has 1, then the data might be moved to another 
> "xxx-stray" dir, and the partition becomes empty.
> For (2), the impact is, if this is a stray replica and we didn't delete it, 
> it might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.
> As the investigation above, this `partition.metadata` missing issue is mostly 
> because the async `partition.metadata` when creating a topic. Later, before 
> any data append into log, we must make sure partition metadata file is 
> written to the log dir 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L772-L774].
>  So, it should be fine if we delete it since the topic should be empty.
> In short, when finding a log without topicID, we should treat it as a stray 
> log and then delete it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849919#comment-17849919
 ] 

Luke Chen commented on KAFKA-16814:
---

[~brandboat] , thanks for the help! I'd like to make this fix into v3.7.1 and 
v3.8.0 because the impact of this issue is the broker cannot startup at all. 
Let me know if you have any problem.

> KRaft broker cannot startup when `partition.metadata` is missing
> 
>
> Key: KAFKA-16814
> URL: https://issues.apache.org/jira/browse/KAFKA-16814
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> When starting up kafka logManager, we'll check stray replicas to avoid some 
> corner cases. But this check might cause broker unable to startup if 
> `partition.metadata` is missing because when startup kafka, we load log from 
> file, and the topicId of the log is coming from `partition.metadata` file. 
> So, if `partition.metadata` is missing, the topicId will be None, and the 
> `LogManager#isStrayKraftReplica` will fail with no topicID error.
> The `partition.metadata` missing could be some storage failure, or another 
> possible path is unclean shutdown after topic is created in the replica, but 
> before data is flushed into `partition.metadata` file. This is possible 
> because we do the flush in async way 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].
>  
>  
> {code:java}
> ERROR Encountered fatal fault: Error starting LogManager 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> java.lang.RuntimeException: The log dir 
> Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
> partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
> logEndOffset=0) does not have a topic ID, which is not allowed when running 
> in KRaft mode.
>     at 
> kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
>     at scala.Option.getOrElse(Option.scala:201)
>     at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
>     at kafka.log.LogManager.loadLog(LogManager.scala:359)
>     at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>     at java.base/java.lang.Thread.run(Thread.java:1623) {code}
>  
> Because if we don't do the isStrayKraftReplica check, the topicID and the 
> `partition.metadata` will get recovered after getting topic partition update 
> and becoming leader or follower later. I'm proposing we skip the 
> `isStrayKraftReplica` check if topicID is None, instead of throwing exception 
> to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
> only, it should be fine IMO.
>  
>  
> === update ===
> Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
> delete them is because the replica should be deleted, but left in the log 
> dir. So, if we have a replica that doesn't have topicID (due to 
> `partition.metadata` is missing), then we cannot identify if this is a stray 
> replica or not. In this case, we can do:
>  # Delete it
>  # Ignore it
> For (1), the impact is, if this is not a stray replica, and the 
> replication-factor only has 1, then the data might be moved to another 
> "xxx-stray" dir, and the partition becomes empty.
> For (2), the impact is, if this is a stray replica and we didn't delete it, 
> it might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.
> As the investigation above, this `partition.metadata` missing issue is mostly 
> because the async `partition.metadata` when creating a topic. Later, before 
> any data append into log, we must make sure partition metadata file is 
> written to the log dir 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L772-L774].
>  So, it should be fine if we delete it since the topic should be empty.
> In short, when finding a log without topicID, we should treat it as a stray 
> log and

[jira] [Updated] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16814:
--
Description: 
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR Encountered fatal fault: Error starting LogManager 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.RuntimeException: The log dir 
Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
logEndOffset=0) does not have a topic ID, which is not allowed when running in 
KRaft mode.
    at 
kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
    at scala.Option.getOrElse(Option.scala:201)
    at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
    at 
kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
    at kafka.log.LogManager.loadLog(LogManager.scala:359)
    at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1623) {code}
 

Because if we don't do the isStrayKraftReplica check, the topicID and the 
`partition.metadata` will get recovered after getting topic partition update 
and becoming leader or follower later. I'm proposing we delete the 
`isStrayKraftReplica` check if topicID is None (see below), instead of throwing 
exception to terminate the kafka. 

 

 

=== update ===

Checked KAFKA-14616 and KAFKA-15605, our purpose of finding strayReplicas and 
delete them is because the replica should be deleted, but left in the log dir. 
So, if we have a replica that doesn't have topicID (due to `partition.metadata` 
is missing), then we cannot identify if this is a stray replica or not. In this 
case, we can do:
 # Delete it
 # Ignore it

For (1), the impact is, if this is not a stray replica, and the 
replication-factor only has 1, then the data might be moved to another 
"xxx-stray" dir, and the partition becomes empty.

For (2), the impact is, if this is a stray replica and we didn't delete it, it 
might cause partition dir is not created as in KAFKA-15605 or KAFKA-14616.

As the investigation above, this `partition.metadata` missing issue is mostly 
because the async `partition.metadata` when creating a topic. Later, before any 
data append into log, we must make sure partition metadata file is written to 
the log dir 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L772-L774].
 So, it should be fine if we delete it since the topic should be empty.

In short, when finding a log without topicID, we should treat it as a stray log 
and then delete it.

 

  was:
When starting up kafka logManager, we'll check stray replicas to avoid some 
corner cases. But this check might cause broker unable to startup if 
`partition.metadata` is missing because when startup kafka, we load log from 
file, and the topicId of the log is coming from `partition.metadata` file. So, 
if `partition.metadata` is missing, the topicId will be None, and the 
`LogManager#isStrayKraftReplica` will fail with no topicID error.

The `partition.metadata` missing could be some storage failure, or another 
possible path is unclean shutdown after topic is created in the replica, but 
before data is flushed into `partition.metadata` file. This is possible because 
we do the flush in async way 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].

 

 
{code:java}
ERROR 

[PR] MINOR: Modify doc of advertised.listeners to show supported on KRaft mode. [kafka]

2024-05-28 Thread via GitHub


sasakitoa opened a new pull request, #16100:
URL: https://github.com/apache/kafka/pull/16100

   `advertised.listeners` is usable on not only ZooKeeper mode but also KRaft 
mode.
   This PR changes related description of doc to explain it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849926#comment-17849926
 ] 

Chia-Ping Tsai commented on KAFKA-15203:


The reflection repo, however,  is inactive now, the potential issue, such as 
CVE, is still existent. Hence, I feel it is worth finding alternative.

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16849) ERROR Failed to read /opt/kafka/data/meta.properties

2024-05-28 Thread Agostino Sarubbo (Jira)
Agostino Sarubbo created KAFKA-16849:


 Summary: ERROR Failed to read /opt/kafka/data/meta.properties
 Key: KAFKA-16849
 URL: https://issues.apache.org/jira/browse/KAFKA-16849
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.1
 Environment: RockyLinux-9
openjdk version "1.8.0_412"
Reporter: Agostino Sarubbo


I'm running a kafka-2.8.1 cluster with 5 machines.
For how it is configured (I have set a replica of 3) I can completely destroy a 
machine and re-create it from scratch. After have re-created it, it joins again 
the cluster and everything works as before.

So, right now I'm trying to migrate the host from CentOS-7 to RockyLinux-9. The 
idea was destroy and re-create the machines one-by-one.

I have increased the loglevel to DEBUG and this is what I get:



```
[2024-05-28 07:35:09,287] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$) 
[2024-05-28 07:35:09,676] INFO Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler) 
[2024-05-28 07:35:09,680] INFO starting (kafka.server.KafkaServer) 
[2024-05-28 07:35:09,680] INFO Connecting to zookeeper on 
zookeeper1.my.domain:2281,zookeeper2.my.domain:2281,zookeeper3.my.domain:2281,zookeeper4.my.domain:2281,zookeeper5.my.domain:
2281 (kafka.server.KafkaServer) 
[2024-05-28 07:35:09,681] DEBUG Checking login config for Zookeeper JAAS 
context [java.security.auth.login.config=null, 
zookeeper.sasl.client=default:true, zookeeper.sasl.clientconfig
=default:Client] (org.apache.kafka.common.security.JaasUtils) 
[2024-05-28 07:35:09,695] INFO [ZooKeeperClient Kafka server] Initializing a 
new session to 
zookeeper1.my.domain:2281,zookeeper2.my.domain:2281,zookeeper3.my.domain:2281,zookeeper4.my
.domain:2281,zookeeper5.my.domain:2281. (kafka.zookeeper.ZooKeeperClient) 
[2024-05-28 07:35:09,758] DEBUG Initializing task scheduler. 
(kafka.utils.KafkaScheduler) 
[2024-05-28 07:35:09,759] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient) 
[2024-05-28 07:35:10,051] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient) 
[2024-05-28 07:35:10,053] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient) 
[2024-05-28 07:35:10,139] INFO [feature-zk-node-event-process-thread]: Starting 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) 
[2024-05-28 07:35:10,144] DEBUG Reading feature ZK node at path: /feature 
(kafka.server.FinalizedFeatureChangeListener) 
[2024-05-28 07:35:10,251] INFO Updated cache from existing  to latest 
FinalizedFeaturesAndEpoch(features=Features{}, epoch=1). 
(kafka.server.FinalizedFeatureCache) 
[2024-05-28 07:35:10,255] INFO Cluster ID = 3G4teZlrS-uT6-Sk5MkbPQ 
(kafka.server.KafkaServer) 
[2024-05-28 07:35:10,258] ERROR Failed to read /opt/kafka/data/meta.properties 
(kafka.server.BrokerMetadataCheckpoint$) 
java.nio.file.AccessDeniedException: /opt/kafka/data/meta.properties.tmp 
   at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:84) 
   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
   at 
sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244) 
   at 
sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108)
 
   at java.nio.file.Files.deleteIfExists(Files.java:1165) 
   at 
kafka.server.BrokerMetadataCheckpoint.read(BrokerMetadataCheckpoint.scala:224) 
   at 
kafka.server.BrokerMetadataCheckpoint$.$anonfun$getBrokerMetadataAndOfflineDirs$2(BrokerMetadataCheckpoint.scala:158)
 
   at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 
   at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) 
   at 
kafka.server.BrokerMetadataCheckpoint$.getBrokerMetadataAndOfflineDirs(BrokerMetadataCheckpoint.scala:153)
 
   at kafka.server.KafkaServer.startup(KafkaServer.scala:206) 
   at kafka.Kafka$.main(Kafka.scala:109) 
   at kafka.Kafka.main(Kafka.scala) 
[2024-05-28 07:35:10,326] INFO KafkaConfig values:  
   advertised.host.name = null 
   advertised.listeners = null 
   advertised.port = null 
   alter.config.policy.class.name = null 
   alter.log.dirs.replication.quota.window.num = 11 
   alter.log.dirs.replication.quota.window.size.seconds = 1 
   authorizer.class.name =  
   auto.create.topics.enable = true 
   auto.leader.rebalance.enable = true 
   background.threads = 10 
   broker.heartbeat.interval.ms = 2000 
   broker.id = 1 

[PR] KAFKA-16844: Add ByteBuffer support for Connect ByteArrayConverter [kafka]

2024-05-28 Thread via GitHub


fanyang opened a new pull request, #16101:
URL: https://github.com/apache/kafka/pull/16101

   In current Connect schema design, schema type Bytes have 2 representations, 
byte[] and ByteBuffer. But current ByteArrayConverter can only convert byte[]. 
   This PR adds ByteBuffer support in ByteArrayConverter. Also added test for 
the change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16844) ByteArrayConverter can't convert ByteBuffer

2024-05-28 Thread Fan Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849927#comment-17849927
 ] 

Fan Yang commented on KAFKA-16844:
--

Already sent PR for it.

> ByteArrayConverter can't convert ByteBuffer
> ---
>
> Key: KAFKA-16844
> URL: https://issues.apache.org/jira/browse/KAFKA-16844
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Fan Yang
>Assignee: Fan Yang
>Priority: Minor
>
> In current Schema design, schema type Bytes correspond to two kinds of 
> classes, byte[] and ByteBuffer. But current ByteArrayConverter can only 
> convert byte[]. My suggestion is to add ByteBuffer support in current 
> ByteArrayConverter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]

2024-05-28 Thread via GitHub


chia7712 commented on PR #15862:
URL: https://github.com/apache/kafka/pull/15862#issuecomment-2134588650

   @TaiJuWu Could you please rebase code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16849) ERROR Failed to read /opt/kafka/data/meta.properties

2024-05-28 Thread Agostino Sarubbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Agostino Sarubbo updated KAFKA-16849:
-
Description: 
I'm running a kafka-2.8.1 cluster with 5 machines.
For how it is configured (I have set a replica of 3) I can completely destroy a 
machine and re-create it from scratch. After have re-created it, it joins again 
the cluster and everything works as before.

So, right now I'm trying to migrate the host from CentOS-7 to RockyLinux-9. The 
idea was destroy and re-create the machines one-by-one.

I have increased the loglevel to DEBUG and this is what I get:
{code:java}
[2024-05-28 07:35:09,287] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
[2024-05-28 07:35:09,676] INFO Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)
[2024-05-28 07:35:09,680] INFO starting (kafka.server.KafkaServer)
[2024-05-28 07:35:09,680] INFO Connecting to zookeeper on 
zookeeper1.my.domain:2281,zookeeper2.my.domain:2281,zookeeper3.my.domain:2281,zookeeper4.my.domain:2281,zookeeper5.my.domain:2281
 (kafka.server.KafkaServer)
[2024-05-28 07:35:09,681] DEBUG Checking login config for Zookeeper JAAS 
context [java.security.auth.login.config=null, 
zookeeper.sasl.client=default:true, zookeeper.sasl.clientconfig=default:Client] 
(org.apache.kafka.common.security.JaasUtils)
[2024-05-28 07:35:09,695] INFO [ZooKeeperClient Kafka server] Initializing a 
new session to 
zookeeper1.my.domain:2281,zookeeper2.my.domain:2281,zookeeper3.my.domain:2281,zookeeper4.my.domain:2281,zookeeper5.my.domain:2281.
 (kafka.zookeeper.ZooKeeperClient)
[2024-05-28 07:35:09,758] DEBUG Initializing task scheduler. 
(kafka.utils.KafkaScheduler)
[2024-05-28 07:35:09,759] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
[2024-05-28 07:35:10,051] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2024-05-28 07:35:10,053] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)
[2024-05-28 07:35:10,139] INFO [feature-zk-node-event-process-thread]: Starting 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2024-05-28 07:35:10,144] DEBUG Reading feature ZK node at path: /feature 
(kafka.server.FinalizedFeatureChangeListener)
[2024-05-28 07:35:10,251] INFO Updated cache from existing  to latest 
FinalizedFeaturesAndEpoch(features=Features{}, epoch=1). 
(kafka.server.FinalizedFeatureCache)
[2024-05-28 07:35:10,255] INFO Cluster ID = 3G4teZlrS-uT6-Sk5MkbPQ 
(kafka.server.KafkaServer)
[2024-05-28 07:35:10,258] ERROR Failed to read /opt/kafka/data/meta.properties 
(kafka.server.BrokerMetadataCheckpoint$)
java.nio.file.AccessDeniedException: /opt/kafka/data/meta.properties.tmp
        at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
        at 
sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244)
        at 
sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108)
        at java.nio.file.Files.deleteIfExists(Files.java:1165)
        at 
kafka.server.BrokerMetadataCheckpoint.read(BrokerMetadataCheckpoint.scala:224)
        at 
kafka.server.BrokerMetadataCheckpoint$.$anonfun$getBrokerMetadataAndOfflineDirs$2(BrokerMetadataCheckpoint.scala:158)
        at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
        at 
kafka.server.BrokerMetadataCheckpoint$.getBrokerMetadataAndOfflineDirs(BrokerMetadataCheckpoint.scala:153)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:206)
        at kafka.Kafka$.main(Kafka.scala:109)
        at kafka.Kafka.main(Kafka.scala)
[2024-05-28 07:35:10,326] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name = 
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.heartbeat.interval.ms = 2000
        broker.id = 1
        broker.id.generation.enable = true
        broker.rack = null
        broker.session.timeout.ms = 9000
        client.quota.callback.class = null
        compression.type = producer
        connection.failed.authentication.delay.ms = 

Re: [PR] MINOR: Modify doc of advertised.listeners to show supported on KRaft mode. [kafka]

2024-05-28 Thread via GitHub


showuon commented on PR #16100:
URL: https://github.com/apache/kafka/pull/16100#issuecomment-2134632821

   Thanks for the PR @sasakitoa ! But this issue is already addressed in 
https://github.com/apache/kafka/pull/15860 . Welcome to provide your review 
comment in that PR. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modify doc of advertised.listeners to show supported on KRaft mode. [kafka]

2024-05-28 Thread via GitHub


sasakitoa closed pull request #16100: MINOR: Modify doc of advertised.listeners 
to show supported on KRaft mode.
URL: https://github.com/apache/kafka/pull/16100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modify doc of advertised.listeners to show supported on KRaft mode. [kafka]

2024-05-28 Thread via GitHub


sasakitoa commented on PR #16100:
URL: https://github.com/apache/kafka/pull/16100#issuecomment-2134642036

   Thanks for comments, @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16617: Include kraft handler in advertised listener doc [kafka]

2024-05-28 Thread via GitHub


showuon commented on code in PR #15860:
URL: https://github.com/apache/kafka/pull/15860#discussion_r1616820915


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -58,13 +58,17 @@ public class SocketServerConfigs {
 
 public static final String ADVERTISED_LISTENERS_CONFIG = 
"advertised.listeners";
 public static final String ADVERTISED_LISTENERS_DOC = String.format(
-"Listeners to publish to ZooKeeper for clients to use, if 
different than the %s config property." +
-" In IaaS environments, this may need to be different from 
the interface to which the broker binds." +
-" If this is not set, the value for %1$1s 
will be used." +
-" Unlike %1$1s, it is not valid to advertise 
the 0.0.0.0 meta-address.%n" +
-" Also unlike %1$1s, there can be duplicated 
ports in this property," +
-" so that one listener can be configured to advertise 
another listener's address." +
-" This can be useful in some cases where external load 
balancers are used.", LISTENERS_CONFIG);
+"Specifies the listener addresses that the Kafka brokers will 
advertise to clients and other brokers." +
+" The config is useful where the actual listener configuration 
%s does not represent the addresses that clients should" +
+" use to connect, such as in cloud environments. In environments 
using ZooKeeper, these addresses are published to ZooKeeper." +
+" In KRaft mode, these addresses are managed internally by the 
Kafka brokers themselves. Regardless of the operating mode, this config" +

Review Comment:
   > In KRaft mode, these addresses are managed internally by the Kafka brokers 
themselves.
   
   Is this correct? I mean, yes, they have metadataCache, like in ZK, the 
broker also has metadataCache. But in ZK, we'll publish to ZK, so in KRaft, 
where will we publish the advertised listener to?



##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -58,13 +58,17 @@ public class SocketServerConfigs {
 
 public static final String ADVERTISED_LISTENERS_CONFIG = 
"advertised.listeners";
 public static final String ADVERTISED_LISTENERS_DOC = String.format(
-"Listeners to publish to ZooKeeper for clients to use, if 
different than the %s config property." +
-" In IaaS environments, this may need to be different from 
the interface to which the broker binds." +
-" If this is not set, the value for %1$1s 
will be used." +
-" Unlike %1$1s, it is not valid to advertise 
the 0.0.0.0 meta-address.%n" +
-" Also unlike %1$1s, there can be duplicated 
ports in this property," +
-" so that one listener can be configured to advertise 
another listener's address." +
-" This can be useful in some cases where external load 
balancers are used.", LISTENERS_CONFIG);
+"Specifies the listener addresses that the Kafka brokers will 
advertise to clients and other brokers." +
+" The config is useful where the actual listener configuration 
%s does not represent the addresses that clients should" +

Review Comment:
   Should we use `%1$1s` as below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849938#comment-17849938
 ] 

Ganesh Sadanala commented on KAFKA-15203:
-

[~chia7712]  I will make an update once i find the alternative.

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16805: Stop using a ClosureBackedAction to configure Spotbugs reports [kafka]

2024-05-28 Thread via GitHub


chia7712 commented on PR #16081:
URL: https://github.com/apache/kafka/pull/16081#issuecomment-2134661663

   ```
   chia7712@chia7712-ubuntu:~/project/kafka$ ./gradlew clean build -x test 
--warning-mode all
   Starting a Gradle Daemon, 8 stopped Daemons could not be reused, use 
--status for details
   
   > Configure project :
   Starting build with version 3.8.0-SNAPSHOT (commit id 91284d8d) using Gradle 
8.7, Java 17 and Scala 2.13.14
   Build properties: maxParallelForks=24, maxScalacThreads=8, maxTestRetries=0
   Build file '/home/chia7712/project/kafka/build.gradle': line 310
   The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. 
This is scheduled to be removed in Gradle 9.0. Consult the upgrading guide for 
further information: 
https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation
   at 
build_dfxsezet52f5eom30zrzfpxff$_run_closure4.doCall$original(/home/chia7712/project/kafka/build.gradle:310)
   (Run with --stacktrace to get the full stack trace of this 
deprecation warning.)
   at 
build_dfxsezet52f5eom30zrzfpxff.run(/home/chia7712/project/kafka/build.gradle:246)
   (Run with --stacktrace to get the full stack trace of this 
deprecation warning.)
   Build file '/home/chia7712/project/kafka/build.gradle': line 311
   The org.gradle.api.plugins.JavaPluginConvention type has been deprecated. 
This is scheduled to be removed in Gradle 9.0. Consult the upgrading guide for 
further information: 
https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation
   at 
build_dfxsezet52f5eom30zrzfpxff$_run_closure4.doCall$original(/home/chia7712/project/kafka/build.gradle:311)
   (Run with --stacktrace to get the full stack trace of this 
deprecation warning.)
   at 
build_dfxsezet52f5eom30zrzfpxff.run(/home/chia7712/project/kafka/build.gradle:246)
   (Run with --stacktrace to get the full stack trace of this 
deprecation warning.)
   The org.gradle.util.ConfigureUtil type has been deprecated. This is 
scheduled to be removed in Gradle 9.0. Consult the upgrading guide for further 
information: 
https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations
   at 
build_dfxsezet52f5eom30zrzfpxff$_run_closure18.doCall$original(/home/chia7712/project/kafka/build.gradle:1511)
   (Run with --stacktrace to get the full stack trace of this 
deprecation warning.)
   
   ```
   
   the warnings about `ClosureBackedAction` is gone.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16805: Stop using a ClosureBackedAction to configure Spotbugs reports [kafka]

2024-05-28 Thread via GitHub


chia7712 merged PR #16081:
URL: https://github.com/apache/kafka/pull/16081


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16520) Changes to DescribeQuorum response

2024-05-28 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849940#comment-17849940
 ] 

Nikolay Izhikov commented on KAFKA-16520:
-

Hello [~jsancio] 

Do we want to enhance FetchRequest inside this PR? Or I can leave DirectoryId 
equals to null for cases when `ReplicaState` created while handling 
FetchRequest?

> Changes to DescribeQuorum response
> --
>
> Key: KAFKA-16520
> URL: https://issues.apache.org/jira/browse/KAFKA-16520
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Nikolay Izhikov
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16805) Stop using a ClosureBackedAction to configure Spotbugs reports

2024-05-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16805.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Stop using a ClosureBackedAction to configure Spotbugs reports
> --
>
> Key: KAFKA-16805
> URL: https://issues.apache.org/jira/browse/KAFKA-16805
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Assignee: 黃竣陽
>Priority: Major
>  Labels: newbie
> Fix For: 3.8.0
>
>
> The org.gradle.util.ClosureBackedAction type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_7.html#org_gradle_util_reports_deprecations]
>     
> 1 usage    
> [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L745-L749]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-28 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1616841401


##
.gitignore:
##
@@ -13,6 +13,7 @@ project/boot/
 project/plugins/project/
 patch-process/*
 .idea
+!/.idea/codeStyles

Review Comment:
   the whole `idea` folder is not in version control, so do we need this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-28 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1616859562


##
.gitignore:
##
@@ -13,6 +13,7 @@ project/boot/
 project/plugins/project/
 patch-process/*
 .idea
+!/.idea/codeStyles

Review Comment:
   > the whole `idea` folder is not in version control, so do we need this line?
   
   It has to be, and it's very important.
   
   This file will allow you to import your idea automatically into the 
project's code format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Improve ConsumerRebalanceListener doc [kafka]

2024-05-28 Thread via GitHub


AndrewJSchofield commented on code in PR #16083:
URL: https://github.com/apache/kafka/pull/16083#discussion_r1616865033


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java:
##
@@ -28,37 +28,37 @@
  * This is applicable when the consumer is having Kafka auto-manage group 
membership. If the consumer directly assigns partitions,
  * those partitions will never be reassigned and this callback is not 
applicable.
  * 
- * When Kafka is managing the group membership, a partition re-assignment will 
be triggered any time the members of the group change or the subscription
+ * When Kafka is managing the group membership, a partition re-assignment will 
be triggered whenever the members of the group change or the subscription
  * of the members changes. This can occur when processes die, new process 
instances are added or old instances come back to life after failure.
- * Partition re-assignments can also be triggered by changes affecting the 
subscribed topics (e.g. when the number of partitions is
+ * Partition re-assignments can also be triggered by changing affecting the 
subscribed topics (e.g. when the number of partitions is
  * administratively adjusted).
  * 
  * There are many uses for this functionality. One common use is saving 
offsets in a custom store. By saving offsets in
  * the {@link #onPartitionsRevoked(Collection)} call we can ensure that any 
time partition assignment changes

Review Comment:
   It would read more clearly with a comma between "call" and "we can ensure".



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java:
##
@@ -28,37 +28,37 @@
  * This is applicable when the consumer is having Kafka auto-manage group 
membership. If the consumer directly assigns partitions,
  * those partitions will never be reassigned and this callback is not 
applicable.
  * 
- * When Kafka is managing the group membership, a partition re-assignment will 
be triggered any time the members of the group change or the subscription
+ * When Kafka is managing the group membership, a partition re-assignment will 
be triggered whenever the members of the group change or the subscription
  * of the members changes. This can occur when processes die, new process 
instances are added or old instances come back to life after failure.
- * Partition re-assignments can also be triggered by changes affecting the 
subscribed topics (e.g. when the number of partitions is
+ * Partition re-assignments can also be triggered by changing affecting the 
subscribed topics (e.g. when the number of partitions is
  * administratively adjusted).
  * 
  * There are many uses for this functionality. One common use is saving 
offsets in a custom store. By saving offsets in
  * the {@link #onPartitionsRevoked(Collection)} call we can ensure that any 
time partition assignment changes
  * the offset gets saved.
  * 
  * Another use is flushing out any kind of cache of intermediate results the 
consumer may be keeping. For example,
- * consider a case where the consumer is subscribed to a topic containing user 
page views, and the goal is to count the
- * number of page views per user for each five minute window. Let's say the 
topic is partitioned by the user id so that
+ * consider a case where the consumer is subscribing to a topic containing 
user page views, and the goal is to count the

Review Comment:
   I think "subscribed" is better. With "subscribing", it sounds like we are 
only concerned with the time while the consumer is actually making its 
subscription. An alternative might be "where the consumer subscribes to a 
topic".



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java:
##
@@ -119,15 +119,15 @@
 public interface ConsumerRebalanceListener {
 
 /**
- * A callback method the user can implement to provide handling of offset 
commits to a customized store.
+ * A callback method the user can implement to provide handling of offset 
commits **sent to** a customized store

Review Comment:
   Also, I wonder whether this first sentence is really giving a common example 
of what the callback might do. Really, the callback is to provide handling of 
cleaning up resources which are being assigned to other consumers. The 
customized store is a possible use.



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java:
##
@@ -119,15 +119,15 @@
 public interface ConsumerRebalanceListener {
 
 /**
- * A callback method the user can implement to provide handling of offset 
commits to a customized store.
+ * A callback method the user can implement to provide handling of offset 
commits **sent to** a customized store

Review Comment:
   Add "." to the end of this sentence please.



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java:
##
@@ -28,3

Re: [PR] KAFKA-16722: Introduce ConsumerGroupPartitionAssignor interface [kafka]

2024-05-28 Thread via GitHub


AndrewJSchofield commented on PR #15998:
URL: https://github.com/apache/kafka/pull/15998#issuecomment-2134730354

   Rebased to resolve merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16850) KRaft - Add v2 of TopicRecord

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16850:
-

 Summary: KRaft - Add v2 of TopicRecord
 Key: KAFKA-16850
 URL: https://issues.apache.org/jira/browse/KAFKA-16850
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16851) Add remote.log.disable.policy

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16851:
-

 Summary: Add remote.log.disable.policy
 Key: KAFKA-16851
 URL: https://issues.apache.org/jira/browse/KAFKA-16851
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov


*Summary*

Add the configuration as internal-only to begin with. Do not wire it to 
anything yet, just ensure that it is settable dynamically



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16852) Add *.thread.pool.size

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16852:
-

 Summary: Add *.thread.pool.size
 Key: KAFKA-16852
 URL: https://issues.apache.org/jira/browse/KAFKA-16852
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov


*Summary*

Add the remote.log.manager.copier.thread.pool.size and 
remote.log.manager.expiration.thread.pool.size configurations as internal-only



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16515) Fix the ZK Metadata cache use of voter static configuration

2024-05-28 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849946#comment-17849946
 ] 

Josep Prat commented on KAFKA-16515:


[~cmccabe] Do I understand right, that this Jira is done now?

> Fix the ZK Metadata cache use of voter static configuration
> ---
>
> Key: KAFKA-16515
> URL: https://issues.apache.org/jira/browse/KAFKA-16515
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: José Armando García Sancio
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.8.0
>
>
> Looks like because of ZK migration to KRaft the ZK Metadata cache was changed 
> to read the voter static configuration. This needs to change to use the voter 
> nodes reported by  the raft manager or the kraft client.
> The injection code is in KafkaServer where it constructs 
> MetadataCache.zkMetadata.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16516) Fix the controller node provider for broker to control channel

2024-05-28 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849948#comment-17849948
 ] 

Josep Prat commented on KAFKA-16516:


[~cmccabe] Do I understand right, that this Jira is done now?

> Fix the controller node provider for broker to control channel
> --
>
> Key: KAFKA-16516
> URL: https://issues.apache.org/jira/browse/KAFKA-16516
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: José Armando García Sancio
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.8.0
>
>
> The broker to controller channel gets the set of voters directly from the 
> static configuration. This needs to change so that the leader nodes comes 
> from the kraft client/manager.
> The code is in KafkaServer where it construct the RaftControllerNodeProvider.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16853) Split RemoteLogManagerScheduledThreadPool

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16853:
-

 Summary: Split RemoteLogManagerScheduledThreadPool
 Key: KAFKA-16853
 URL: https://issues.apache.org/jira/browse/KAFKA-16853
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov


*Summary*

To begin with create just the RemoteDataExpirationThreadPool and move 
expiration to it. Keep all settings as if the only thread pool was the 
RemoteLogManagerScheduledThreadPool. Ensure that the new thread pool is wired 
correctly to the RemoteLogManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KeyValue test cases for ToString() method [kafka]

2024-05-28 Thread via GitHub


jayim243 opened a new pull request, #16102:
URL: https://github.com/apache/kafka/pull/16102

   Added test cases to ensure that the ToString() method works accordingly for 
KeyValue objects.
   
   This contribution is my original work and I license the work to the project 
under the project's open source license.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-28 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1616916667


##
.gitignore:
##
@@ -13,6 +13,7 @@ project/boot/
 project/plugins/project/
 patch-process/*
 .idea
+!/.idea/codeStyles

Review Comment:
   It seems to me using build tool to address that is more acceptable. We all 
use gradle to build kafka but there are many IDE.
   
   We can add IDE supports in the future but could you please keep this PR 
simple for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16854) Zookeeper - Add v5 of StopReplica

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16854:
-

 Summary: Zookeeper - Add v5 of StopReplica
 Key: KAFKA-16854
 URL: https://issues.apache.org/jira/browse/KAFKA-16854
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Minor fix invalid name REPLICA_LAG_TIME_MAX_MS_CONFIG [kafka]

2024-05-28 Thread via GitHub


k-raina opened a new pull request, #16103:
URL: https://github.com/apache/kafka/pull/16103

   **Summary**
   Minor naming error introduced as part of 
https://github.com/apache/kafka/pull/15575/files#diff-f4c48da6e1d079a4579514d7b2102804e805f16cd172dfe8bddf866f7028d333L46
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16855) KRaft - Wire replaying a TopicRecord

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16855:
-

 Summary: KRaft - Wire replaying a TopicRecord
 Key: KAFKA-16855
 URL: https://issues.apache.org/jira/browse/KAFKA-16855
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov


*Summary*

Replaying a TopicRecord containing a new TieredEpoch and TieredState needs to 
interact with the two thread pools in the RemoteLogManager to add/remove the 
correct tasks from each



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16856) Zookeeper - Add new exception

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16856:
-

 Summary: Zookeeper - Add new exception
 Key: KAFKA-16856
 URL: https://issues.apache.org/jira/browse/KAFKA-16856
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov


*Summary*

Add TIERED_STORAGE_DISABLEMENT_IN_PROGRESS exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16857) Zookeeper - Add new ZNodes

2024-05-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16857:
-

 Summary: Zookeeper - Add new ZNodes
 Key: KAFKA-16857
 URL: https://issues.apache.org/jira/browse/KAFKA-16857
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov


*Summary*

Additional information needs to be stored in new ZNodes as part of disablement. 
Ensure that said information makes it into Zookeeper.
{code:java}
/brokers/topics/{topic-name}/partitions
/tieredstorage/  /tiered_epoch  
/state {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16857) Zookeeper - Add new ZNodes

2024-05-28 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-16857:
--
Description: 
*Summary*

Additional information needs to be stored in new ZNodes as part of disablement. 
Ensure that said information makes it into Zookeeper.
{code:java}
/brokers/topics/{topic-name}/partitions
   /tieredstorage
 /tiered_epoch  
   
 /state {code}

  was:
*Summary*

Additional information needs to be stored in new ZNodes as part of disablement. 
Ensure that said information makes it into Zookeeper.
{code:java}
/brokers/topics/{topic-name}/partitions
/tieredstorage/  /tiered_epoch  
/state {code}


> Zookeeper - Add new ZNodes
> --
>
> Key: KAFKA-16857
> URL: https://issues.apache.org/jira/browse/KAFKA-16857
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Priority: Major
>
> *Summary*
> Additional information needs to be stored in new ZNodes as part of 
> disablement. Ensure that said information makes it into Zookeeper.
> {code:java}
> /brokers/topics/{topic-name}/partitions
>/tieredstorage
>  /tiered_epoch
>  
>  /state {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


omkreddy commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1616959290


##
bin/kafka-run-class.sh:
##
@@ -208,7 +208,7 @@ fi
 
 # JMX settings
 if [ -z "$KAFKA_JMX_OPTS" ]; then
-  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false  
-Dcom.sun.management.jmxremote.ssl=false "
+  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true 
-Dcom.sun.management.jmxremote.authenticate=false  
-Dcom.sun.management.jmxremote.ssl=false "

Review Comment:
   nit: maynot be required as default is true for this system property



##
bin/kafka-run-class.sh:
##
@@ -340,9 +340,13 @@ CLASSPATH=${CLASSPATH#:}
 # If Cygwin is detected, classpath is converted to Windows format.
 (( WINDOWS_OS_FORMAT )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
 
-# Launch mode
-if [ "x$DAEMON_MODE" = "xtrue" ]; then
-  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS 
$KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" 
$KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+if [ "$KAFKA_MODE" = "native" ]; then

Review Comment:
   Can we use the existing convention to test the string like `if [ 
"x$KAFKA_MODE" = "xnative" ]`



##
bin/kafka-run-class.sh:
##
@@ -340,9 +340,13 @@ CLASSPATH=${CLASSPATH#:}
 # If Cygwin is detected, classpath is converted to Windows format.
 (( WINDOWS_OS_FORMAT )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
 
-# Launch mode
-if [ "x$DAEMON_MODE" = "xtrue" ]; then
-  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS 
$KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" 
$KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+if [ "$KAFKA_MODE" = "native" ]; then
+  exec $base_dir/kafka.Kafka start --config "$2"  $KAFKA_LOG4J_CMD_OPTS 
$KAFKA_JMX_OPTS $KAFKA_OPTS

Review Comment:
   Dont want to to pass `$KAFKA_HEAP_OPTS` to native run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14995) Automate asf.yaml collaborators refresh

2024-05-28 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-14995:
--

Assignee: (was: Steven Booke)

> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move general configs out of KafkaConfig [kafka]

2024-05-28 Thread via GitHub


OmniaGM commented on code in PR #16040:
URL: https://github.com/apache/kafka/pull/16040#discussion_r1616995730


##
server/src/main/java/org/apache/kafka/server/config/KafkaServerConfigs.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.authorizer.Authorizer;
+public class KafkaServerConfigs {

Review Comment:
   Not needed but we have already `KafkaServer` so I named it as 
`KafkaServerConfigs` to follow that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move general configs out of KafkaConfig [kafka]

2024-05-28 Thread via GitHub


mimaison commented on code in PR #16040:
URL: https://github.com/apache/kafka/pull/16040#discussion_r1617003681


##
server/src/main/java/org/apache/kafka/server/config/KafkaServerConfigs.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.authorizer.Authorizer;
+public class KafkaServerConfigs {

Review Comment:
   I think we can drop the prefix and have shorter names. KafkaProducer uses 
ProducerConfig.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849976#comment-17849976
 ] 

Ganesh Sadanala commented on KAFKA-15203:
-

[~chia7712] I believe this the closest replacement to the Reflections: 
[https://github.com/classgraph/classgraph]

 

What are your thoughts on Class Graph library?

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15541: Add oldest-iterator-open-since-ms metric [kafka]

2024-05-28 Thread via GitHub


nicktelford commented on code in PR #16041:
URL: https://github.com/apache/kafka/pull/16041#discussion_r1617008040


##
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java:
##
@@ -490,6 +490,37 @@ public void shouldTimeIteratorDuration() {
 assertThat((double) iteratorDurationMaxMetric.metricValue(), 
equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
 }
 
+@Test
+public void shouldTrackOldestOpenIteratorTimestamp() {
+when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+init();
+
+final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+
+assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+
+final KeyValueIterator second;
+final long secondTimestamp;
+try (final KeyValueIterator first = metered.all()) {
+final long oldestTimestamp = mockTime.milliseconds();
+assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
+mockTime.sleep(100);
+
+// open a second iterator before closing the first to test that we 
still produce the first iterator's timestamp
+second = metered.all();
+secondTimestamp = mockTime.milliseconds();
+assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
+mockTime.sleep(100);
+}
+
+// now that the first iterator is closed, check that the timestamp has 
advanced to the still open second iterator
+assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(secondTimestamp));
+second.close();

Review Comment:
   Done. I assumed you were suggesting this in all the tests, so I updated them 
all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16822: Abstract consumer group to share functionality with share group (KIP-932) [kafka]

2024-05-28 Thread via GitHub


apoorvmittal10 commented on code in PR #16054:
URL: https://github.com/apache/kafka/pull/16054#discussion_r1617009938


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/AbstractGroup.java:
##
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
+import static 
org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
+
+public abstract class AbstractGroup implements Group {
+
+public static class DeadlineAndEpoch {
+static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+public final long deadlineMs;
+public final int epoch;
+
+DeadlineAndEpoch(long deadlineMs, int epoch) {
+this.deadlineMs = deadlineMs;
+this.epoch = epoch;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+protected final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+protected final String groupId;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+protected final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+protected final TimelineHashMap members;
+
+/**
+ * The number of subscribers per topic.
+ */
+protected final TimelineHashMap subscribedTopicNames;
+
+/**
+ * The metadata associated with each subscribed topic name.
+ */
+protected final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The group's subscription type.
+ * This value is set to Homogeneous by default.
+ */
+protected final TimelineObject subscriptionType;
+
+/**
+ * The target assignment epoch. An assignment epoch smaller than the group 
epoch
+ * means that a new assignment is required. The assignment epoch is 
updated when
+ * a new assignment is installed.
+ */
+protected final TimelineInteger targetAssignmentEpoch;
+
+/**
+ * The target assignment per member id.
+ */
+protected final TimelineHashMap targetAssignment;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * its epochs from this map. When a member gets a partition, it adds its 
epochs to this map.
+ */
+protected final TimelineHashMap> 
currentPartitionEpoch;
+
+/**
+ * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with
+ * the group epoch at the time of setting it. The metadata refresh time is 
considered as a
+ * soft state (read that it is not stored in a timeline data structure). 
It is like this
+ * because it is not persisted to the log. The group epoch is here to 
ensure that the
+ * metadata refresh deadline is invalidated if the group epoch

Re: [PR] KAFKA-16822: Abstract consumer group to share functionality with share group (KIP-932) [kafka]

2024-05-28 Thread via GitHub


apoorvmittal10 commented on code in PR #16054:
URL: https://github.com/apache/kafka/pull/16054#discussion_r1617010639


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/AbstractGroup.java:
##
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;

Review Comment:
   Moved it to `modern` package as discussed above with renamed as 
`AbstractModernGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16822: Abstract consumer group to share functionality with share group (KIP-932) [kafka]

2024-05-28 Thread via GitHub


apoorvmittal10 commented on code in PR #16054:
URL: https://github.com/apache/kafka/pull/16054#discussion_r1617011178


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMember.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.MemberState;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract member common for group members.
+ */
+public abstract class GroupMember {

Review Comment:
   Moved it to `modern` package as discussed above with renamed as 
`ModernGroupMember`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-28 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2134918869

   kindly ping @philipnee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16822: Abstract consumer group to share functionality with share group (KIP-932) [kafka]

2024-05-28 Thread via GitHub


apoorvmittal10 commented on code in PR #16054:
URL: https://github.com/apache/kafka/pull/16054#discussion_r1617014505


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/AbstractGroup.java:
##
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
+import static 
org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
+
+public abstract class AbstractGroup implements Group {

Review Comment:
   I have renamed `AbstractGroup` to `AbstractModernGroup` and `GroupMember` to 
`ModernGroupMember`. Also I have created a common package named `modern` as 
suggested above.
   
   This PR mostly does the refactor around adding abstract classes and in the 
next PR I will move `consumer` package under `modern`, also will move couple of 
shared classes from `consumer` to `modern` as well. Just don't to increase more 
diffs in the current PR with additional refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1617015938


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {
+public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+
+private final long quotaBytesPerSecond;
+private final int numQuotaSamples;
+private final int quotaWindowSizeSeconds;
+
+public long quotaBytesPerSecond() {
+return quotaBytesPerSecond;
+}
+
+public int numQuotaSamples() {
+return numQuotaSamples;
+}
+
+public int quotaWindowSizeSeconds() {
+return quotaWindowSizeSeconds;
+}
+
+/**
+ * Configuration settings for quota management
+ *
+ * @param quotaBytesPerSecondThe quota in bytes per second
+ * @param numQuotaSamplesThe number of samples to retain in memory
+ * @param quotaWindowSizeSeconds The time span of each sample
+ */
+public RLMQuotaManagerConfig(long quotaBytesPerSecond, int 
numQuotaSamples, int quotaWindowSizeSeconds) {

Review Comment:
   nit: 
   
   place constructor before the getter methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849981#comment-17849981
 ] 

Chia-Ping Tsai commented on KAFKA-15203:


It seems it requires JDK16+ 
(https://github.com/classgraph/classgraph?tab=readme-ov-file#running-on-jdk-16)?

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Fix the config name in `ProducerFailureHandlingTest` [kafka]

2024-05-28 Thread via GitHub


omkreddy merged PR #16099:
URL: https://github.com/apache/kafka/pull/16099


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849983#comment-17849983
 ] 

Ganesh Sadanala commented on KAFKA-15203:
-

[~chia7712] The library does not work as expected on projects using JDK 16+ 
There is no other library which can scan without the configuration 
files/metadata. I guess we have to wait until the deprecation is addressed.

 

Two problem due to delayed deprecation:
 # The implementation of `ServiceLoader` is not noticeable because it is used 
in combine with Reflections. So the performance wise no change is observed.
 # Security vulnerabilities would be still valid.

 

Unless it is a serious issue, it can be awaited. This is my opinion.

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849983#comment-17849983
 ] 

Ganesh Sadanala edited comment on KAFKA-15203 at 5/28/24 11:06 AM:
---

[~chia7712] The library does not work as expected on projects using JDK 16+ 
There is no other library which can scan without the configuration 
files/metadata. I guess we have to wait until the deprecation is addressed.

 

Two problems due to delayed deprecation:
 # The implementation of `ServiceLoader` is not noticeable because it is used 
in combine with Reflections. So the performance wise no change is observed.
 # Security vulnerabilities would be still valid.

 

Unless it is a serious issue, it can be awaited. This is my opinion.


was (Author: JIRAUSER305566):
[~chia7712] The library does not work as expected on projects using JDK 16+ 
There is no other library which can scan without the configuration 
files/metadata. I guess we have to wait until the deprecation is addressed.

 

Two problem due to delayed deprecation:
 # The implementation of `ServiceLoader` is not noticeable because it is used 
in combine with Reflections. So the performance wise no change is observed.
 # Security vulnerabilities would be still valid.

 

Unless it is a serious issue, it can be awaited. This is my opinion.

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2024-05-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849986#comment-17849986
 ] 

Chia-Ping Tsai commented on KAFKA-15203:


{quote}
Unless it is a serious issue, it can be awaited. This is my opinion.
{quote}

agree :)

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
> Fix For: 5.0.0
>
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16796: Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder [kafka]

2024-05-28 Thread via GitHub


FrankYang0529 commented on PR #16064:
URL: https://github.com/apache/kafka/pull/16064#issuecomment-2134961455

   Hi @chia7712, I address all comments. Thanks for your review and suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-28 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849991#comment-17849991
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

[~christo_lolov] [~nikramakrishnan]  wdyt about the new pr changes ? 

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- a/core/src

Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-28 Thread via GitHub


chia7712 commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1617053123


##
core/src/test/scala/unit/kafka/admin/AclCommandTest.scala:
##
@@ -122,19 +128,27 @@ class AclCommandTest extends QuorumTestHarness with 
Logging {
 super.tearDown()
   }
 
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+val controllerConfig = new Properties
+controllerConfig.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[StandardAuthorizer].getName)
+controllerConfig.put(StandardAuthorizer.SUPER_USERS_CONFIG, 
"User:ANONYMOUS")
+Seq(controllerConfig)
+  }
+
   @Test
   def testAclCliWithAuthorizer(): Unit = {
 testAclCli(zkArgs)
   }
 
-  @Test
-  def testAclCliWithAdminAPI(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))

Review Comment:
   > Just wondering, do we need to check this main output at all?
   
   from the testing we can know that output "Current ACLs" could be out-of-date 
information. +1 to your previous changes that removing `listAcls` from 
`removeAcls` and `addAcls` in order to avoid confusing users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move general configs out of KafkaConfig [kafka]

2024-05-28 Thread via GitHub


chia7712 commented on PR #16040:
URL: https://github.com/apache/kafka/pull/16040#issuecomment-2134977554

   @OmniaGM It seems the conflicts are not fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move general configs out of KafkaConfig [kafka]

2024-05-28 Thread via GitHub


OmniaGM commented on PR #16040:
URL: https://github.com/apache/kafka/pull/16040#issuecomment-2134979805

   > @OmniaGM It seems the conflicts are not fixed.
   
   this is a new one :D will fix this now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor fix invalid name REPLICA_LAG_TIME_MAX_MS_CONFIG [kafka]

2024-05-28 Thread via GitHub


chia7712 commented on PR #16103:
URL: https://github.com/apache/kafka/pull/16103#issuecomment-2134980943

   @k-raina thanks for your contribution. this is already fixed by #16099


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor fix invalid name REPLICA_LAG_TIME_MAX_MS_CONFIG [kafka]

2024-05-28 Thread via GitHub


chia7712 closed pull request #16103: Minor fix invalid name 
REPLICA_LAG_TIME_MAX_MS_CONFIG
URL: https://github.com/apache/kafka/pull/16103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move general configs out of KafkaConfig [kafka]

2024-05-28 Thread via GitHub


OmniaGM commented on PR #16040:
URL: https://github.com/apache/kafka/pull/16040#issuecomment-2134991869

   Fixed now! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-28 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1611972192


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -105,6 +108,49 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map props) {
+List configValues = super.validate(props).configValues();
+String emitCheckpointsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT));
+String syncGroupOffsetsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT));
+String emitOffsetSyncsValue = 
Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT));
+
+if ("false".equals(emitCheckpointsValue) && 
"false".equals(syncGroupOffsetsValue)) {
+ConfigValue syncGroupOffsets = configValues.stream().filter(prop 
-> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+ConfigValue emitCheckpoints = configValues.stream().filter(prop -> 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+String errorMessage = "MirrorCheckpointConnector can't run with 
both" +
+MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false";
+syncGroupOffsets.addErrorMessage(errorMessage);
+emitCheckpoints.addErrorMessage(errorMessage);
+}
+if ("false".equals(emitOffsetSyncsValue) && 
("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) {
+ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> 
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't 
run while MirrorSourceConnector configured with" +
+MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to 
false");
+}
+return new Config(configValues);

Review Comment:
   > Can these validations live in the config class? In this case that would be 
MirrorConnectConfig.
   
   `MirrorConnectConfig` is more generic and shared between multiple 
connectors. We shouldn't move this validation there maybe 
`MirrorCheckpointConfig` but not `MirrorConnectConfig`
   
   > Besides being in line with config validation it has the added benefit that 
we can use .getBoolean(...) instead of  
props.get(..prop.)).orElse(Boolean.toString(...default...).
   
   I think we should to leverage `Connector::validate` here as it get called 
before `start` and we can fail faster. If we want to use `getBooleans` maybe we 
should initialise `MirrorCheckpointConfig` within the validate but we shouldn't 
move this out of `Connector::validate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617075771


##
bin/kafka-run-class.sh:
##
@@ -340,9 +340,13 @@ CLASSPATH=${CLASSPATH#:}
 # If Cygwin is detected, classpath is converted to Windows format.
 (( WINDOWS_OS_FORMAT )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
 
-# Launch mode
-if [ "x$DAEMON_MODE" = "xtrue" ]; then
-  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS 
$KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" 
$KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+if [ "$KAFKA_MODE" = "native" ]; then
+  exec $base_dir/kafka.Kafka start --config "$2"  $KAFKA_LOG4J_CMD_OPTS 
$KAFKA_JMX_OPTS $KAFKA_OPTS

Review Comment:
   As per the `native-image` 
[docs](https://www.graalvm.org/jdk21/reference-manual/native-image/optimizations-and-performance/MemoryManagement/#java-heap-size)
   ```
   When executing a native image, suitable Java heap settings will be 
determined automatically based on the system configuration and the used GC. 
   ```
   Though they have given mechanism to override it explicitly, I am making use 
of the automatic mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617078533


##
bin/kafka-run-class.sh:
##
@@ -208,7 +208,7 @@ fi
 
 # JMX settings
 if [ -z "$KAFKA_JMX_OPTS" ]; then
-  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false  
-Dcom.sun.management.jmxremote.ssl=false "
+  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true 
-Dcom.sun.management.jmxremote.authenticate=false  
-Dcom.sun.management.jmxremote.ssl=false "

Review Comment:
   Yes, that is correct.
   But the native binary requires it explicitly. Hence added this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


omkreddy commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1616962390


##
bin/kafka-run-class.sh:
##
@@ -340,9 +340,13 @@ CLASSPATH=${CLASSPATH#:}
 # If Cygwin is detected, classpath is converted to Windows format.
 (( WINDOWS_OS_FORMAT )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
 
-# Launch mode
-if [ "x$DAEMON_MODE" = "xtrue" ]; then
-  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS 
$KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" 
$KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+if [ "$KAFKA_MODE" = "native" ]; then
+  exec $base_dir/kafka.Kafka start --config "$2"  $KAFKA_LOG4J_CMD_OPTS 
$KAFKA_JMX_OPTS $KAFKA_OPTS

Review Comment:
   Dont we want to to pass `$KAFKA_HEAP_OPTS` to native run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617078533


##
bin/kafka-run-class.sh:
##
@@ -208,7 +208,7 @@ fi
 
 # JMX settings
 if [ -z "$KAFKA_JMX_OPTS" ]; then
-  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false  
-Dcom.sun.management.jmxremote.ssl=false "
+  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true 
-Dcom.sun.management.jmxremote.authenticate=false  
-Dcom.sun.management.jmxremote.ssl=false "

Review Comment:
   Yes, that is correct for JVM.
   But the native binary requires it explicitly. Hence added this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


omkreddy commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1616962390


##
bin/kafka-run-class.sh:
##
@@ -340,9 +340,13 @@ CLASSPATH=${CLASSPATH#:}
 # If Cygwin is detected, classpath is converted to Windows format.
 (( WINDOWS_OS_FORMAT )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
 
-# Launch mode
-if [ "x$DAEMON_MODE" = "xtrue" ]; then
-  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS 
$KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" 
$KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+if [ "$KAFKA_MODE" = "native" ]; then
+  exec $base_dir/kafka.Kafka start --config "$2"  $KAFKA_LOG4J_CMD_OPTS 
$KAFKA_JMX_OPTS $KAFKA_OPTS

Review Comment:
   Dont we need to pass `$KAFKA_HEAP_OPTS` to native run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16806) Explicitly declare JUnit dependencies for all test modules

2024-05-28 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850002#comment-17850002
 ] 

TengYao Chi commented on KAFKA-16806:
-

I will handle this :)

> Explicitly declare JUnit dependencies for all test modules
> --
>
> Key: KAFKA-16806
> URL: https://issues.apache.org/jira/browse/KAFKA-16806
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>
> The automatic loading of test framework implementation dependencies has been 
> deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> Declare the desired test framework directly on the test suite or explicitly 
> declare the test framework implementation dependencies on the test's runtime 
> classpath.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_framework_implementation_dependencies]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1617105411


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {

Review Comment:
   If the copy quota gets breached, all the `kafka-rlm-thread-pool` threads 
will wait for the quota to be available which might delay the deletion of 
remote log segments since the same thread does both copy and delete.
   
   Do you plan to split the copy/delete operations in a separate thread pool?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -237,6 +242,13 @@ private void removeMetrics() {
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
+/**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+ */
+Duration quotaTimeout() {

Review Comment:
   shall we rename `quotaTimeout` to `throttleTimeMs`?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -153,6 +156,8 @@ public class RemoteLogManager implements Closeable {
 
 private final RemoteLogMetadataManager remoteLogMetadataManager;
 
+private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);

Review Comment:
   Why `fairness` is turned on to true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16806) Explicitly declare JUnit dependencies for all test modules

2024-05-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16806:
--

Assignee: TengYao Chi

> Explicitly declare JUnit dependencies for all test modules
> --
>
> Key: KAFKA-16806
> URL: https://issues.apache.org/jira/browse/KAFKA-16806
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Major
>
> The automatic loading of test framework implementation dependencies has been 
> deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> Declare the desired test framework directly on the test suite or explicitly 
> declare the test framework implementation dependencies on the test's runtime 
> classpath.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_framework_implementation_dependencies]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617116268


##
bin/kafka-run-class.sh:
##
@@ -340,9 +340,13 @@ CLASSPATH=${CLASSPATH#:}
 # If Cygwin is detected, classpath is converted to Windows format.
 (( WINDOWS_OS_FORMAT )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
 
-# Launch mode
-if [ "x$DAEMON_MODE" = "xtrue" ]; then
-  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS 
$KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" 
$KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+if [ "$KAFKA_MODE" = "native" ]; then

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-28 Thread via GitHub


cadonna commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2135043687

   @raminqaf Could you please rebase this PR on current trunk to get the new 
build setup? We need to restart the builds because one was red. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617119890


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -994,7 +1000,7 @@ def thread_dump(self, node):
 def clean_node(self, node):
 JmxMixin.clean_node(self, node)
 self.security_config.clean_node(node)
-node.account.kill_java_processes(self.java_class_name(),

Review Comment:
   `kill_java_processes` command won't work if Kafka is running in the native 
mode.
   Replacing it with `kill_process`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617119324


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -926,7 +932,7 @@ def run_features_command(self, op, new_version):
 def pids(self, node):
 """Return process ids associated with running processes on the given 
node."""
 try:
-cmd = "jcmd | grep -e %s | awk '{print $1}'" % 
self.java_class_name()

Review Comment:
   `jcmd` command won't work if Kafka is running in the native mode.
   Replacing it with `ps ax`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617123960


##
tests/kafkatest/services/security/templates/jaas.conf:
##
@@ -55,6 +55,7 @@ KafkaServer {
 useKeyTab=true
 storeKey=true
 keyTab="/mnt/security/keytab"
+refreshKrb5Config=true

Review Comment:
   This change is needed because of an issue in GraalVm native-image. 
   I raised this in their community slack channel.
   This is a workaround for now.
   Conversation thread: 
https://graalvm.slack.com/archives/CN9KSFB40/p1700246576831259



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617123960


##
tests/kafkatest/services/security/templates/jaas.conf:
##
@@ -55,6 +55,7 @@ KafkaServer {
 useKeyTab=true
 storeKey=true
 keyTab="/mnt/security/keytab"
+refreshKrb5Config=true

Review Comment:
   This change is needed because of an issue in GraalVm native-image. They will 
fix it in upcoming releases.
   I raised this in their community slack channel.
   This is a workaround for now.
   Conversation thread: 
https://graalvm.slack.com/archives/CN9KSFB40/p1700246576831259



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


kagarwal06 commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617123960


##
tests/kafkatest/services/security/templates/jaas.conf:
##
@@ -55,6 +55,7 @@ KafkaServer {
 useKeyTab=true
 storeKey=true
 keyTab="/mnt/security/keytab"
+refreshKrb5Config=true

Review Comment:
   This change is needed because of an issue in GraalVm native-image tool. They 
will fix it in upcoming releases.
   I raised this in their community slack channel.
   This is a workaround for now.
   Conversation thread: 
https://graalvm.slack.com/archives/CN9KSFB40/p1700246576831259



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16583: fix PartitionRegistration#toRecord directory check under metadata version 3_7_IV2 [kafka]

2024-05-28 Thread via GitHub


soarez commented on PR #15751:
URL: https://github.com/apache/kafka/pull/15751#issuecomment-2135057113

   
   Can you please confirm:
   1. In the JIRA you describe the cluster as formed of 1 Controller and 2 
Brokers (BrokerA and BrokerB), is my understanding correct that all of these 
were running 3.4.0 and that BrokerB was the first and only server to be 
upgraded to 3.7.0?
   2. Did you configure brokers with  `inter.broker.protocol.version` at any 
point or let it default?
   
   This is the stack trace in the JIRA:
   
   ```
   [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: 
Unhandled error initializing new publishers 
(org.apache.kafka.server.fault.LoggingFaultHandler)
   org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.4-IV0: the directory assignment state of one or more replicas
at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   It shows that:
   * The failure happens after the broker first catches up with cluster 
metadata, in `MetadataLoader#initializeNewPublishers`, converting the metadata 
image into a delta used to initialize publishers.
   * `UnwritableMetadataException` is thrown from 
`PartitionRegistration#toRecord` because `PartitionRegistration#directories` 
contain some directory ID that’s not `MIGRATING` and `MetadataVersion` (MV) is 
still below `IBP_3_7_IV2`. This operation converts `PartitionRegistration` – 
the in-memory representation of partition metadata – into `PartitionRecord` – 
the serializable metadata format. 
   
   Neither controllers nor brokers pre 3.7 produce records with directory IDs, 
the only possible value in previous records is `MIGRATING`. So `UNASSIGNED` was 
set while loading metadata. i.e. `PartitionRegistration` must have produced 
`UNASSIGNED` during either:
   * class initialization – we can rule out 
`PartitionRegistration.Builder#build()` as that’s only used in the controller, 
and if I understood the issue description correctly, your controller was still 
running 3.4.0. ❌ 
   * loading `PartitionRecord` – directory IDs are taken as-is in the 
`PartitionRegistration(PartitionRecord record)` constructor and default to 
`MIGRATING`. The records wouldn’t previously have any IDs, so we rule this out 
too. ❌ 
   * loading `PartitionChangeRecord` – in `merge(PartitionChangeRecord record)` 
the loaded records wouldn’t have any directory IDs, but we call 
`DirectoryId.createDirectoriesFrom` which produces `UNASSIGNED`. As far as I 
can tell, this is the likely culprit. ✅ 
   
   `PartitionRegistration` loads records without dir IDs and doesn't check the 
MV before using `UNASSIGNED`.
   There are at least two approaches to fix this:
   1. Refer to the current MV as the record is interpreted so we can choose the 
correct default. Referring to the current MV should be required while 
interpreting potentially older records that omit fields. But as @superhx 
pointed out, it’s a bit tricky to to that from `#merge`.
   2. Do not produce `UNASSIGNED` and default to `MIGRATING` regardless of MV. 
The onus of setting `UNASSIGNED` falls to the creation of 
`PartitionChangeRecord`. These records are produced by the Controller, via 
`PartitionChangeBuilder` which selects the directory using 
`ClusterControlManager#defaultDir`, which will pick `UNASSIGNED` as necessary. 
So this this seems fine.
   
   From these two approaches, I prefer the second one, it’s not only a simpler 
change but also removes redundant logic. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-28 Thread via GitHub


omkreddy commented on code in PR #16046:
URL: https://github.com/apache/kafka/pull/16046#discussion_r1617138022


##
tests/docker/run_tests.sh:
##
@@ -19,20 +19,53 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd 
)"
 KAFKA_NUM_CONTAINERS=${KAFKA_NUM_CONTAINERS:-14}
 TC_PATHS=${TC_PATHS:-./kafkatest/}
 REBUILD=${REBUILD:f}
+TMP_NATIVE_DIR=${SCRIPT_DIR}/native/
+
+get_mode() {
+  if [[ "$_DUCKTAPE_OPTIONS" == *"kafka_mode"* && "$_DUCKTAPE_OPTIONS" == 
*"native"* ]]; then
+export KAFKA_MODE="native"
+  else
+export KAFKA_MODE="jvm"
+  fi
+}

Review Comment:
   Can we print the mode here



##
tests/docker/run_tests.sh:
##
@@ -19,20 +19,53 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd 
)"
 KAFKA_NUM_CONTAINERS=${KAFKA_NUM_CONTAINERS:-14}
 TC_PATHS=${TC_PATHS:-./kafkatest/}
 REBUILD=${REBUILD:f}
+TMP_NATIVE_DIR=${SCRIPT_DIR}/native/
+
+get_mode() {
+  if [[ "$_DUCKTAPE_OPTIONS" == *"kafka_mode"* && "$_DUCKTAPE_OPTIONS" == 
*"native"* ]]; then
+export KAFKA_MODE="native"
+  else
+export KAFKA_MODE="jvm"
+  fi
+}
+
+cleanup() {
+  if [ -d "${TMP_NATIVE_DIR}" ]; then
+echo "Deleting temporary native dir: ${TMP_NATIVE_DIR}"
+rm -rf "${TMP_NATIVE_DIR}"
+  fi
+}
 
 die() {
-echo $@
-exit 1
+  cleanup
+  echo $@
+  exit 1
 }
 
 if [ "$REBUILD" == "t" ]; then
 ./gradlew clean systemTestLibs
 fi
 
+get_mode
+cleanup && mkdir "${TMP_NATIVE_DIR}"
+if [ "$KAFKA_MODE" == "native" ]; then
+  kafka_tarball_filename=(core/build/distributions/kafka*SNAPSHOT.tgz)

Review Comment:
   Can we print a line stating building for native image



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-28 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1617187677


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -105,6 +108,49 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map props) {
+List configValues = super.validate(props).configValues();
+String emitCheckpointsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT));
+String syncGroupOffsetsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT));
+String emitOffsetSyncsValue = 
Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT));
+
+if ("false".equals(emitCheckpointsValue) && 
"false".equals(syncGroupOffsetsValue)) {
+ConfigValue syncGroupOffsets = configValues.stream().filter(prop 
-> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+ConfigValue emitCheckpoints = configValues.stream().filter(prop -> 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+String errorMessage = "MirrorCheckpointConnector can't run with 
both" +
+MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false";
+syncGroupOffsets.addErrorMessage(errorMessage);
+emitCheckpoints.addErrorMessage(errorMessage);
+}
+if ("false".equals(emitOffsetSyncsValue) && 
("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) {
+ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> 
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't 
run while MirrorSourceConnector configured with" +
+MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to 
false");
+}
+return new Config(configValues);

Review Comment:
   Just want to clarify some points regarding connect:
   1. `Connector::validate` get called by the runtime before `Connector::start` 
which will get us to fail faster
   2. `Connector::validate` get called in 
`AbstractHerder::validateConnectorConfig` which handles connector/task 
lifecycle tracking. 
   3. While `Connector::start` get called during the reconfiguration of the 
plugin in connect as well as at the start
   
   So we can't move the validation to `Connector::start` and we can't move the 
initialisation of `MirrorConnectorConfig` into `Connector::validate`. 
   
   If we want to use `.getBoolean` then I would suggest having a look into my 
last commit where I temporary initialised a connect configs to use the getters 
from `AbstractConfig` which all mirror connector configs extends. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-28 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1617187677


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -105,6 +108,49 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map props) {
+List configValues = super.validate(props).configValues();
+String emitCheckpointsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT));
+String syncGroupOffsetsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT));
+String emitOffsetSyncsValue = 
Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT));
+
+if ("false".equals(emitCheckpointsValue) && 
"false".equals(syncGroupOffsetsValue)) {
+ConfigValue syncGroupOffsets = configValues.stream().filter(prop 
-> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+ConfigValue emitCheckpoints = configValues.stream().filter(prop -> 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+String errorMessage = "MirrorCheckpointConnector can't run with 
both" +
+MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false";
+syncGroupOffsets.addErrorMessage(errorMessage);
+emitCheckpoints.addErrorMessage(errorMessage);
+}
+if ("false".equals(emitOffsetSyncsValue) && 
("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) {
+ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> 
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't 
run while MirrorSourceConnector configured with" +
+MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to 
false");
+}
+return new Config(configValues);

Review Comment:
   Just want to clarify some points regarding connect (to the extend of my 
understanding):
   1. `Connector::validate` get called by the runtime before `Connector::start` 
which will get us to fail faster
   2. `Connector::validate` get called in 
`AbstractHerder::validateConnectorConfig` which handles connector/task 
lifecycle tracking. This also get called from connect REST api `config/validate`
   3. While `Connector::start` get called during the reconfiguration of the 
plugin in connect as well as at the start
   
   So we can't move the validation to `Connector::start` and we can't move the 
initialisation of `MirrorConnectorConfig` into `Connector::validate`. 
   
   If we want to use `.getBoolean` then I would suggest having a look into my 
last 
[commit](https://github.com/apache/kafka/pull/15999/commits/27c07546e6b6983e79b1c5989564b337f84e930c)
 where I temporary initialised a connect configs to use the getters from 
`AbstractConfig` which all mirror connector configs extends. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-28 Thread via GitHub


OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1617187677


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -105,6 +108,49 @@ public void stop() {
 Utils.closeQuietly(targetAdminClient, "target admin client");
 }
 
+@Override
+public Config validate(Map props) {
+List configValues = super.validate(props).configValues();
+String emitCheckpointsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT));
+String syncGroupOffsetsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT));
+String emitOffsetSyncsValue = 
Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT));
+
+if ("false".equals(emitCheckpointsValue) && 
"false".equals(syncGroupOffsetsValue)) {
+ConfigValue syncGroupOffsets = configValues.stream().filter(prop 
-> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+ConfigValue emitCheckpoints = configValues.stream().filter(prop -> 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+String errorMessage = "MirrorCheckpointConnector can't run with 
both" +
+MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false";
+syncGroupOffsets.addErrorMessage(errorMessage);
+emitCheckpoints.addErrorMessage(errorMessage);
+}
+if ("false".equals(emitOffsetSyncsValue) && 
("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) {
+ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> 
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
+.findAny()
+.orElseGet(() -> {
+ConfigValue result = new 
ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
+configValues.add(result);
+return result;
+});
+
+emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't 
run while MirrorSourceConnector configured with" +
+MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to 
false");
+}
+return new Config(configValues);

Review Comment:
   Just want to clarify some points regarding connect (to the extend of my 
understanding):
   1. `Connector::validate` get called by the runtime before `Connector::start` 
which will get us to fail faster
   2. `Connector::validate` get called in 
`AbstractHerder::validateConnectorConfig` which handles connector/task 
lifecycle tracking. This also get called from connect REST api `config/validate`
   3. While `Connector::start` get called during the reconfiguration of the 
plugin in connect as well as at the start
   
   So we can't move the validation to `Connector::start` and we can't move the 
initialisation of `MirrorConnectorConfig` into `Connector::validate`. 
   
   If we want to use `.getBoolean` then I would suggest having a look into my 
last 
[commit](https://github.com/apache/kafka/pull/15999/commits/27c07546e6b6983e79b1c5989564b337f84e930c)
 where I temporary initialised a connect config to use the getters from 
`AbstractConfig` which all mirror connector configs extends. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-28 Thread via GitHub


raminqaf commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2135147503

   > @raminqaf Could you please rebase this PR on current trunk to get the new 
build setup? We need to restart the builds because one was red.
   
   Done! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Minor: Make VoterSetHistoryTest.testAddAt make sense [kafka]

2024-05-28 Thread via GitHub


dengziming opened a new pull request, #16104:
URL: https://github.com/apache/kafka/pull/16104

   *More detailed description of your change*
   We should remove a voter to test removal of a node.
   
   *Summary of testing strategy (including rationale)*
   No.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15541: Add oldest-iterator-open-since-ms metric [kafka]

2024-05-28 Thread via GitHub


nicktelford commented on PR #16041:
URL: https://github.com/apache/kafka/pull/16041#issuecomment-2135179192

   @mjsax Not sure why the build is failing. `:streams:compileJava` works fine 
locally. Perhaps the build got confused, can we repeat it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16483: Remove preAppendErrors from createPutCacheCallback [kafka]

2024-05-28 Thread via GitHub


FrankYang0529 opened a new pull request, #16105:
URL: https://github.com/apache/kafka/pull/16105

   The method `createPutCacheCallback` has a input argument 
[`preAppendErrors`](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L387).
 It is used to keep the "error" happens before appending. However, the 
pre-append error is handled before by calling 
[`responseCallback`](https://github.com/apache/kafka/blob/4f55786a8a86fe228a0b10a2f28529f5128e5d6f/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L927C15-L927C84).
 Hence, we can remove `preAppendErrors`.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate DescribeConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-28 Thread via GitHub


TaiJuWu commented on code in PR #15908:
URL: https://github.com/apache/kafka/pull/15908#discussion_r1617235824


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -16,48 +16,71 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-
+import java.util.stream.Stream;
+
+//import static kafka.test.annotation.Type.CO_KRAFT;

Review Comment:
   should this mark be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #16071:
URL: https://github.com/apache/kafka/pull/16071#discussion_r1617245864


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -6597,6 +6597,79 @@ class ReplicaManagerTest {
   ))
 }
   }
+
+  @Test
+  def testRemoteReadQuotaExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
fetchInfo.fetchOffsetMetadata)
+assertFalse(fetchInfo.records.records().iterator().hasNext)
+assertFalse(fetchInfo.firstEntryIncomplete)
+assertFalse(fetchInfo.abortedTransactions.isPresent)
+assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  @Test
+  def testRemoteReadQuotaNotExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
+assertEquals(UnifiedLog.UnknownOffset, 
fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
+assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
+assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
+assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+try {
+  val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+  replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
+  val partition0Replicas = Seq[Integer](0, 1).asJava
+  val topicIds = Map(tp.topic -> topicId).asJava
+  val leaderEpoch = 0
+  val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(
+  new LeaderAndIsrPartitionState()
+.setTopicName(tp.topic)
+.setPartitionIndex(tp.partition)
+.setControllerEpoch(0)
+.setLeader(leaderEpoch)

Review Comment:
   can we hardcode (or) define a new variable for `leader` instead of reusing 
the `leaderEpoch`?



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -6597,6 +6597,79 @@ class ReplicaManagerTest {
   ))
 }
   }
+
+  @Test
+  def testRemoteReadQuotaExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
fetchInfo.fetchOffsetMetadata)
+assertFalse(fetchInfo.records.records().iterator().hasNext)
+assertFalse(fetchInfo.firstEntryIncomplete)
+assertFalse(fetchInfo.abortedTransactions.isPresent)
+assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  @Test
+  def testRemoteReadQuotaNotExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
+assertEquals(UnifiedLog.UnknownOffset, 
fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
+assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
+assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
+assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enable

Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #16071:
URL: https://github.com/apache/kafka/pull/16071#discussion_r1617253547


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -6597,6 +6597,79 @@ class ReplicaManagerTest {
   ))
 }
   }
+
+  @Test
+  def testRemoteReadQuotaExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
fetchInfo.fetchOffsetMetadata)
+assertFalse(fetchInfo.records.records().iterator().hasNext)
+assertFalse(fetchInfo.firstEntryIncomplete)
+assertFalse(fetchInfo.abortedTransactions.isPresent)
+assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  @Test
+  def testRemoteReadQuotaNotExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
+assertEquals(UnifiedLog.UnknownOffset, 
fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
+assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
+assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
+assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+try {
+  val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+  replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
+  val partition0Replicas = Seq[Integer](0, 1).asJava
+  val topicIds = Map(tp.topic -> topicId).asJava
+  val leaderEpoch = 0
+  val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(
+  new LeaderAndIsrPartitionState()
+.setTopicName(tp.topic)
+.setPartitionIndex(tp.partition)
+.setControllerEpoch(0)
+.setLeader(leaderEpoch)
+.setLeaderEpoch(0)
+.setIsr(partition0Replicas)
+.setPartitionEpoch(0)
+.setReplicas(partition0Replicas)
+.setIsNew(true)
+).asJava,
+topicIds,
+Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+  replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+  val params = new FetchParams(ApiKeys.FETCH.latestVersion, -1, 1, 1000, 
0, 100, FetchIsolation.HIGH_WATERMARK, None.asJava)
+  replicaManager.readFromLog(
+params,
+Seq(new TopicIdPartition(topicId, 0, topic) -> new 
PartitionData(topicId, 1, 0, 10, Optional.of[Integer](leaderEpoch), 
Optional.of(leaderEpoch))),

Review Comment:
   The test exposes an issue, the `fetchOffset` is 1 and the replica-manager 
throws OffsetOutOfRangeException (since the log is empty) . Why we do have to 
check the remoteLogFetch quota in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Improve ConsumerRebalanceListener doc [kafka]

2024-05-28 Thread via GitHub


lianetm commented on code in PR #16083:
URL: https://github.com/apache/kafka/pull/16083#discussion_r1617261952


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java:
##
@@ -119,15 +119,15 @@
 public interface ConsumerRebalanceListener {
 
 /**
- * A callback method the user can implement to provide handling of offset 
commits to a customized store.
+ * A callback method the user can implement to provide handling of offset 
commits **sent to** a customized store

Review Comment:
   Agree with @AndrewJSchofield that it would be an improvement to start 
stating the general purpose of the callback (cleaning up resources being 
reassigned.. as Andrew described). Still, I would also keep the offset 
management example, seems a simple one to understand how to use the callback. I 
honestly don't know how often it's used out there, but it's one of the [Usage 
examples](https://github.com/apache/kafka/blob/699438b7f7801115cb7d66dba202b25af7ff8a1f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L350)
 described in the `KafkaConsumer` documentation, so from a user's perspective 
it would align nicely to refer to the same example here I would say.side 
note: I may be a bit biased here, I've been recently working on this use case 
of the callbacks on the new consumer :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >