Re: [PR] KAFKA-16894: Define group.version=2 [kafka]

2024-06-09 Thread via GitHub


omkreddy commented on PR #16212:
URL: https://github.com/apache/kafka/pull/16212#issuecomment-2157321710

   There are few test failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil

2024-06-09 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853549#comment-17853549
 ] 

Ksolves commented on KAFKA-16803:
-

Sure, [~gharris1727]. Will create PR and update you.

> Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
> 
>
> Key: KAFKA-16803
> URL: https://issues.apache.org/jira/browse/KAFKA-16803
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> The org.gradle.util.ConfigureUtil type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations]
> 2 usages    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil

2024-06-09 Thread Ksolves (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ksolves reassigned KAFKA-16803:
---

Assignee: Ksolves

> Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
> 
>
> Key: KAFKA-16803
> URL: https://issues.apache.org/jira/browse/KAFKA-16803
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Assignee: Ksolves
>Priority: Major
>  Labels: newbie
>
> The org.gradle.util.ConfigureUtil type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations]
> 2 usages    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16924) No log output when running kafka

2024-06-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16924:
--
Description: 
In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
However, we need this runtime dependency in tools module to output logs. 
([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back.

 

Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. Since 
it's removed, we need to explicitly declare it.

 

Current output will be like this:
{code:java}
> ./gradlew clean jar
> bin/kafka-server-start.sh config/kraft/controller.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.{code}
 

  was:
In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
However, we need this runtime dependency in tools module to output logs. 
([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back.

 

Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. Since 
it's removed, we need to explicitly declare it.

 

Current output will be like this:
{code:java}
> bin/kafka-server-start.sh config/kraft/controller.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.{code}
 


> No log output when running kafka 
> -
>
> Key: KAFKA-16924
> URL: https://issues.apache.org/jira/browse/KAFKA-16924
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 4.0.0
>
>
> In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
> dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
> However, we need this runtime dependency in tools module to output logs. 
> ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back.
>  
> Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. 
> Since it's removed, we need to explicitly declare it.
>  
> Current output will be like this:
> {code:java}
> > ./gradlew clean jar
> > bin/kafka-server-start.sh config/kraft/controller.properties
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16260:
URL: https://github.com/apache/kafka/pull/16260#discussion_r1632550895


##
build.gradle:
##
@@ -2178,6 +2177,7 @@ project(':tools') {
 implementation libs.jacksonDataformatCsv
 implementation libs.jacksonJDK8Datatypes
 implementation libs.slf4jApi
+implementation libs.slf4jlog4j

Review Comment:
   ya, we do need that binding jar when running broker/controller. But stream 
examples module has declared that:
   
   https://github.com/apache/kafka/blob/trunk/build.gradle#L2613
   
   I check the distribution, and both `reload4j` and `slf4j-reload4j` are 
existent.
   
   ![螢幕快照 2024-06-10 
12-08-23](https://github.com/apache/kafka/assets/6234750/d8bdcd4f-5344-44db-b72e-4cc7710ae2cf)
   
   Also, I run the command you described but I don't see "Failed to load class 
.. "
   
   ![螢幕快照 2024-06-10 
12-07-27](https://github.com/apache/kafka/assets/6234750/cffda7d9-9c94-48ec-a7e4-ef80746f61d7)
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16114:Fix partiton not retention after cancel alter intra broke… [kafka]

2024-06-09 Thread via GitHub


github-actions[bot] commented on PR #15172:
URL: https://github.com/apache/kafka/pull/15172#issuecomment-2157136679

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16295: Align RocksDB and in-memory store init() sequences [kafka]

2024-06-09 Thread via GitHub


github-actions[bot] commented on PR #15421:
URL: https://github.com/apache/kafka/pull/15421#issuecomment-2157136573

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]

2024-06-09 Thread via GitHub


github-actions[bot] commented on PR #15451:
URL: https://github.com/apache/kafka/pull/15451#issuecomment-2157136549

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2157060477

   Hi @chia7712 , I have do some changes and add a test case, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16913) Support external schemas in JSONConverter

2024-06-09 Thread Ganesh Sadanala (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ganesh Sadanala reassigned KAFKA-16913:
---

Assignee: Ganesh Sadanala

> Support external schemas in JSONConverter
> -
>
> Key: KAFKA-16913
> URL: https://issues.apache.org/jira/browse/KAFKA-16913
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Priyanka K U
>Assignee: Ganesh Sadanala
>Priority: Minor
>
>  KIP-1054: Support external schemas in JSONConverter : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484396


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
 assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+assertEquals(currentPartition, record.partition());
+assertEquals(recordCount % recordSize, record.offset());
+assertEquals(recordCount % recordSize, record.key());
+assertEquals(String.valueOf(recordCount % recordSize), 
record.value());
+
+recordCount++;
 }
-assertEquals(2, c);
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecords() {

Review Comment:
   I will do it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484236


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
 assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+assertEquals(currentPartition, record.partition());
+assertEquals(recordCount % recordSize, record.offset());
+assertEquals(recordCount % recordSize, record.key());
+assertEquals(String.valueOf(recordCount % recordSize), 
record.value());
+
+recordCount++;
 }
-assertEquals(2, c);
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecords() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+Iterator> iterator = 
records.iterator();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a 
new partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+assertEquals(topic, 

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632481576


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {

Review Comment:
   Sure, don't even notice this case doesn't have `test` prefix 藍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on PR #16250:
URL: https://github.com/apache/kafka/pull/16250#issuecomment-2157010773

   Hi @chia7712, I have do a simple change based on your feedback, PTAL   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on code in PR #16250:
URL: https://github.com/apache/kafka/pull/16250#discussion_r1632479332


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2760,9 +2760,12 @@ void handleFailure(Throwable throwable) {
 }, now);
 }
 
-return new DescribeConfigsResult(new 
HashMap<>(nodeFutures.entrySet().stream()
-.flatMap(x -> x.getValue().entrySet().stream())
-.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue;
+Map> resourceToConfigFuture = 
nodeFutures.entrySet()

Review Comment:
   Oh, it just my personal coding style, I will modify it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]

2024-06-09 Thread via GitHub


showuon commented on PR #16260:
URL: https://github.com/apache/kafka/pull/16260#issuecomment-2156996030

   @chia7712 , could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16924) No log output when running kafka

2024-06-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16924:
--
Description: 
In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
However, we need this runtime dependency in tools module to output logs. 
([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back.

 

Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. Since 
it's removed, we need to explicitly declare it.

 

Current output will be like this:
{code:java}
> bin/kafka-server-start.sh config/kraft/controller.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.{code}
 

  was:
In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender 
dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
However, we need this runtime dependency in tools module to output logs. 
([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back.

 

Current output will be like this:
{code:java}
> bin/kafka-server-start.sh config/kraft/controller.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.{code}
 


> No log output when running kafka 
> -
>
> Key: KAFKA-16924
> URL: https://issues.apache.org/jira/browse/KAFKA-16924
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 4.0.0
>
>
> In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
> dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
> However, we need this runtime dependency in tools module to output logs. 
> ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back.
>  
> Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. 
> Since it's removed, we need to explicitly declare it.
>  
> Current output will be like this:
> {code:java}
> > bin/kafka-server-start.sh config/kraft/controller.properties
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16924) No log output when running kafka

2024-06-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16924:
--
Description: 
In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender 
dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
However, we need this runtime dependency in tools module to output logs. 
([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back.

 

Current output will be like this:
{code:java}
> bin/kafka-server-start.sh config/kraft/controller.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.{code}
 

  was:
In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
dependency, and "Add {{compileOnly}} dependency from {{tools}} to {{log4j}} 
(same approach as {{{}core{}}})." . However, we need this runtime dependency in 
tools module to output logs. Adding this dependency back.

 

Current output will be like this:
{code:java}
> bin/kafka-server-start.sh config/kraft/controller.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.{code}
 


> No log output when running kafka 
> -
>
> Key: KAFKA-16924
> URL: https://issues.apache.org/jira/browse/KAFKA-16924
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 4.0.0
>
>
> In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender 
> dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
> However, we need this runtime dependency in tools module to output logs. 
> ([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back.
>  
> Current output will be like this:
> {code:java}
> > bin/kafka-server-start.sh config/kraft/controller.properties
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]

2024-06-09 Thread via GitHub


showuon opened a new pull request, #16260:
URL: https://github.com/apache/kafka/pull/16260

   In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender 
dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
However, we need this runtime dependency in tools module to output logs. 
([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back.
   
   Current output will be like this:
   
   ```
   > bin/kafka-server-start.sh config/kraft/controller.properties
   SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
   SLF4J: Defaulting to no-operation (NOP) logger implementation
   SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
   ```
   
   After this PR, it'll be normal:
   ```
   > bin/kafka-server-start.sh config/kraft/controller.properties 
   [2024-06-10 08:38:50,734] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
   [2024-06-10 08:38:50,832] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
   ...
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16924) No log output when running kafka

2024-06-09 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16924:
-

 Summary: No log output when running kafka 
 Key: KAFKA-16924
 URL: https://issues.apache.org/jira/browse/KAFKA-16924
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen
Assignee: Luke Chen
 Fix For: 4.0.0


In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
dependency, and "Add {{compileOnly}} dependency from {{tools}} to {{log4j}} 
(same approach as {{{}core{}}})." . However, we need this runtime dependency in 
tools module to output logs. Adding this dependency back.

 

Current output will be like this:
{code:java}
> bin/kafka-server-start.sh config/kraft/controller.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16923) New Unit Test for stripDotPathComponents method

2024-06-09 Thread Arnav Dadarya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853543#comment-17853543
 ] 

Arnav Dadarya commented on KAFKA-16923:
---

I have submitted a pull request 
([https://github.com/apache/kafka/pull/16259/files)] can someone approve and 
merge. 

> New Unit Test for stripDotPathComponents method
> ---
>
> Key: KAFKA-16923
> URL: https://issues.apache.org/jira/browse/KAFKA-16923
> Project: Kafka
>  Issue Type: Test
>Reporter: Arnav Dadarya
>Priority: Minor
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> I have written 1 new unit test
> Created new test: *testStripDotPathComponents*
>  * Tests the stripDotPathComponents found in CommandUtils
>  * Ensures that it works as expected
> [https://github.com/apache/kafka/pull/16259/files] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method

2024-06-09 Thread Arnav Dadarya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnav Dadarya updated KAFKA-16923:
--
Description: 
I have written 1 new unit test and modified another unit test.

Created new test: *testStripDotPathComponents*
 * Tests the stripDotPathComponents found in CommandUtils
 * Ensures that it works as expected

Modified test:
 * Created a separate string variable to avoid hardcoding the values for the 
test

 

[https://github.com/apache/kafka/pull/16259/files] 

  was:
I have written 1 new unit test and modified another unit test.

Created new test: *testStripDotPathComponents*
 * Tests the stripDotPathComponents found in CommandUtils
 * Ensures that it works as expected

Modified test:
 * Created a separate string variable to avoid hardcoding the values for the 
test

 

[https://github.com/apache/kafka/pull/16259] 


> New Unit Test for stripDotPathComponents method
> ---
>
> Key: KAFKA-16923
> URL: https://issues.apache.org/jira/browse/KAFKA-16923
> Project: Kafka
>  Issue Type: Test
>Reporter: Arnav Dadarya
>Priority: Minor
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> I have written 1 new unit test and modified another unit test.
> Created new test: *testStripDotPathComponents*
>  * Tests the stripDotPathComponents found in CommandUtils
>  * Ensures that it works as expected
> Modified test:
>  * Created a separate string variable to avoid hardcoding the values for the 
> test
>  
> [https://github.com/apache/kafka/pull/16259/files] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method

2024-06-09 Thread Arnav Dadarya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnav Dadarya updated KAFKA-16923:
--
Description: 
I have written 1 new unit test

Created new test: *testStripDotPathComponents*
 * Tests the stripDotPathComponents found in CommandUtils
 * Ensures that it works as expected

[https://github.com/apache/kafka/pull/16259/files] 

  was:
I have written 1 new unit test and modified another unit test.

Created new test: *testStripDotPathComponents*
 * Tests the stripDotPathComponents found in CommandUtils
 * Ensures that it works as expected

Modified test:
 * Created a separate string variable to avoid hardcoding the values for the 
test

 

[https://github.com/apache/kafka/pull/16259/files] 


> New Unit Test for stripDotPathComponents method
> ---
>
> Key: KAFKA-16923
> URL: https://issues.apache.org/jira/browse/KAFKA-16923
> Project: Kafka
>  Issue Type: Test
>Reporter: Arnav Dadarya
>Priority: Minor
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> I have written 1 new unit test
> Created new test: *testStripDotPathComponents*
>  * Tests the stripDotPathComponents found in CommandUtils
>  * Ensures that it works as expected
> [https://github.com/apache/kafka/pull/16259/files] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-16923) New Unit Test for stripDotPathComponents method

2024-06-09 Thread Arnav Dadarya (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16923 ]


Arnav Dadarya deleted comment on KAFKA-16923:
---

was (Author: JIRAUSER303990):
https://github.com/apache/kafka/pull/16258

> New Unit Test for stripDotPathComponents method
> ---
>
> Key: KAFKA-16923
> URL: https://issues.apache.org/jira/browse/KAFKA-16923
> Project: Kafka
>  Issue Type: Test
>Reporter: Arnav Dadarya
>Priority: Minor
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> I have written 1 new unit test and modified another unit test.
> Created new test: *testStripDotPathComponents*
>  * Tests the stripDotPathComponents found in CommandUtils
>  * Ensures that it works as expected
> Modified test:
>  * Created a separate string variable to avoid hardcoding the values for the 
> test
>  
> [https://github.com/apache/kafka/pull/16259] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method

2024-06-09 Thread Arnav Dadarya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnav Dadarya updated KAFKA-16923:
--
Description: 
I have written 1 new unit test and modified another unit test.

Created new test: *testStripDotPathComponents*
 * Tests the stripDotPathComponents found in CommandUtils
 * Ensures that it works as expected

Modified test:
 * Created a separate string variable to avoid hardcoding the values for the 
test

 

[https://github.com/apache/kafka/pull/16259] 

  was:
I have written 1 new unit test and modified another unit test.

Created new test: *testStripDotPathComponents*
 * Tests the stripDotPathComponents found in CommandUtils
 * Ensures that it works as expected

Modified test:
 * Created a separate string variable to avoid hardcoding the values for the 
test

 

https://github.com/apache/kafka/pull/16258


> New Unit Test for stripDotPathComponents method
> ---
>
> Key: KAFKA-16923
> URL: https://issues.apache.org/jira/browse/KAFKA-16923
> Project: Kafka
>  Issue Type: Test
>Reporter: Arnav Dadarya
>Priority: Minor
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> I have written 1 new unit test and modified another unit test.
> Created new test: *testStripDotPathComponents*
>  * Tests the stripDotPathComponents found in CommandUtils
>  * Ensures that it works as expected
> Modified test:
>  * Created a separate string variable to avoid hardcoding the values for the 
> test
>  
> [https://github.com/apache/kafka/pull/16259] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method

2024-06-09 Thread Arnav Dadarya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnav Dadarya updated KAFKA-16923:
--
External issue URL: https://github.com/apache/kafka/pull/16259  (was: 
https://github.com/apache/kafka/pull/16258)

> New Unit Test for stripDotPathComponents method
> ---
>
> Key: KAFKA-16923
> URL: https://issues.apache.org/jira/browse/KAFKA-16923
> Project: Kafka
>  Issue Type: Test
>Reporter: Arnav Dadarya
>Priority: Minor
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> I have written 1 new unit test and modified another unit test.
> Created new test: *testStripDotPathComponents*
>  * Tests the stripDotPathComponents found in CommandUtils
>  * Ensures that it works as expected
> Modified test:
>  * Created a separate string variable to avoid hardcoding the values for the 
> test
>  
> [https://github.com/apache/kafka/pull/16259] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Unit Test for stripDotPathComponents method in shell [kafka]

2024-06-09 Thread via GitHub


ardada2468 commented on PR #16259:
URL: https://github.com/apache/kafka/pull/16259#issuecomment-2156909921

   only 
shell/src/test/java/org/apache/kafka/shell/command/CommandUtilsTest.java file 
has been changed to incorporate 1 more unit test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Unit Test for stripDotPathComponents method in shell [kafka]

2024-06-09 Thread via GitHub


ardada2468 opened a new pull request, #16259:
URL: https://github.com/apache/kafka/pull/16259

   I have written 1 new unit test and modified another unit test.
   
   Created new test: **testStripDotPathComponents**
   
   - Tests the stripDotPathComponents found in CommandUtils
   - Ensures that the method works as expected (in case of future changes to 
the method)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Created a new unit test for stripDotPathComponents method and updated… [kafka]

2024-06-09 Thread via GitHub


ardada2468 closed pull request #16258: Created a new unit test for 
stripDotPathComponents method and updated…
URL: https://github.com/apache/kafka/pull/16258


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16923) New Unit Test for stripDotPathComponents method

2024-06-09 Thread Arnav Dadarya (Jira)
Arnav Dadarya created KAFKA-16923:
-

 Summary: New Unit Test for stripDotPathComponents method
 Key: KAFKA-16923
 URL: https://issues.apache.org/jira/browse/KAFKA-16923
 Project: Kafka
  Issue Type: Test
Reporter: Arnav Dadarya


I have written 1 new unit test and modified another unit test.

Created new test: *testStripDotPathComponents*
 * Tests the stripDotPathComponents found in CommandUtils
 * Ensures that it works as expected

Modified test:
 * Created a separate string variable to avoid hardcoding the values for the 
test

 

https://github.com/apache/kafka/pull/16258



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Created a new unit test for stripDotPathComponents method and updated… [kafka]

2024-06-09 Thread via GitHub


ardada2468 opened a new pull request, #16258:
URL: https://github.com/apache/kafka/pull/16258

   
   I have written 1 new unit test and modified another unit test. 
   
   Created new test: **testStripDotPathComponents**
   
   - Tests the stripDotPathComponents found in CommandUtils
   - Ensures that it works as expected
   
   Modified test: 
   
   - Created a separate string variable to avoid hardcoding the values for the 
test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632396978


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {

Review Comment:
   > I was planning to move the rest of KafkaConfig to it and make this class 
an abstract in the next PR and KafkaConfig.scala will extend it for now until I 
move everything out
   
   not sure whether I catch your point. it seems we will have a java version 
KafkaConfig but it is smaller?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


OmniaGM commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632391480


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {

Review Comment:
   I was planning to move the rest of KafkaConfig to it and make this class an 
abstract in the next PR and KafkaConfig.scala will extend it for now until I 
move everything out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16855) KRaft - Wire replaying a TopicRecord

2024-06-09 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853535#comment-17853535
 ] 

Muralidhar Basani edited comment on KAFKA-16855 at 6/9/24 8:30 PM:
---

[~christo_lolov] I tried to work on this. Basically I see the changes in 2 
parts.
 * Adding of new fields RemoteLogSegmentMetadata & TopicRecord
 * Wiring them with new thread pools after ticket 
[16853|https://issues.apache.org/jira/browse/KAFKA-16853] is done

I have created first part PR for this. What do you think ?


was (Author: muralibasani):
[~christo_lolov] I tried to work on this. Basically I see the changes in 2 
parts.
 * Adding of new fields RemoteLogSegmentMetadata & TopicRecord
 * Wiring them with new thread pools after ticket 16853 is done

I have created first part PR for this. What do you think ?

> KRaft - Wire replaying a TopicRecord
> 
>
> Key: KAFKA-16855
> URL: https://issues.apache.org/jira/browse/KAFKA-16855
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Priority: Major
>
> *Summary*
> Replaying a TopicRecord containing a new TieredEpoch and TieredState needs to 
> interact with the two thread pools in the RemoteLogManager to add/remove the 
> correct tasks from each



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16855) KRaft - Wire replaying a TopicRecord

2024-06-09 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853535#comment-17853535
 ] 

Muralidhar Basani commented on KAFKA-16855:
---

[~christo_lolov] I tried to work on this. Basically I see the changes in 2 
parts.
 * Adding of new fields RemoteLogSegmentMetadata & TopicRecord
 * Wiring them with new thread pools after ticket 16853 is done

I have created first part PR for this. What do you think ?

> KRaft - Wire replaying a TopicRecord
> 
>
> Key: KAFKA-16855
> URL: https://issues.apache.org/jira/browse/KAFKA-16855
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Priority: Major
>
> *Summary*
> Replaying a TopicRecord containing a new TieredEpoch and TieredState needs to 
> interact with the two thread pools in the RemoteLogManager to add/remove the 
> correct tasks from each



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16855 : Part 1 - New fields tieredEpoch and tieredState [kafka]

2024-06-09 Thread via GitHub


muralibasani opened a new pull request, #16257:
URL: https://github.com/apache/kafka/pull/16257

   Resolves : https://issues.apache.org/jira/browse/KAFKA-16855
   
   - Add field tieredEpoch to RemoteLogSegmentMetadata 
   - Update relevant tests
   - Add two fields tieredEpoch and tieredState to TopicRecord.json
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [X] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1632376721


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 controllerNode.metadataDirectory());
 }
 
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

Review Comment:
   that is good. If you feel this PR needs some helpers, we can have another PR 
to address that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]

2024-06-09 Thread via GitHub


jeremyvdw commented on PR #15615:
URL: https://github.com/apache/kafka/pull/15615#issuecomment-2156746613

   @mjsax According to the [release 
plan](https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0) 
code freeze nis on 12th June: any chance this one and 
https://github.com/apache/kafka/pull/15607 will be merged for 3.8.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-09 Thread via GitHub


muralibasani commented on code in PR #16255:
URL: https://github.com/apache/kafka/pull/16255#discussion_r1632371896


##
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS;
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+public class NewTopicTest {
+
+public static final String TEST_TOPIC = "testtopic";
+public static final int NUM_PARTITIONS = 3;
+public static final int REPLICATION_FACTOR = 1;

Review Comment:
   Ah yes, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-09 Thread via GitHub


muralibasani commented on code in PR #16255:
URL: https://github.com/apache/kafka/pull/16255#discussion_r1632371830


##
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS;
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+public class NewTopicTest {
+
+public static final String TEST_TOPIC = "testtopic";
+public static final int NUM_PARTITIONS = 3;
+public static final int REPLICATION_FACTOR = 1;
+public static final String CLEANUP_POLICY_CONFIG_KEY = "cleanup.policy";
+public static final String CLEANUP_POLICY_CONFIG_VALUE = "compact";
+public static final List BROKER_IDS = Arrays.asList(1, 2);
+
+@Test
+public void testConstructorWithPartitionsAndReplicationFactor() {
+NewTopic topic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) 
REPLICATION_FACTOR);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NUM_PARTITIONS, topic.numPartitions());
+assertEquals(REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithOptionalValues() {
+Optional numPartitions = Optional.empty();
+Optional replicationFactor = Optional.empty();
+NewTopic topic = new NewTopic(TEST_TOPIC, numPartitions, 
replicationFactor);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NO_NUM_PARTITIONS, topic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithReplicasAssignments() {
+Map> replicasAssignments = new HashMap<>();
+replicasAssignments.put(0, BROKER_IDS);
+NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments);
+assertEquals(TEST_TOPIC, newTopic.name());
+assertEquals(NO_NUM_PARTITIONS, newTopic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, newTopic.replicationFactor());
+assertEquals(replicasAssignments, newTopic.replicasAssignments());
+}
+
+@Test
+public void testConfigs() {
+NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) 
REPLICATION_FACTOR);
+Map configs = new HashMap<>();

Review Comment:
   Indeed, added one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-09 Thread via GitHub


muralibasani commented on code in PR #16255:
URL: https://github.com/apache/kafka/pull/16255#discussion_r1632371782


##
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS;
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+public class NewTopicTest {
+
+public static final String TEST_TOPIC = "testtopic";
+public static final int NUM_PARTITIONS = 3;
+public static final int REPLICATION_FACTOR = 1;
+public static final String CLEANUP_POLICY_CONFIG_KEY = "cleanup.policy";
+public static final String CLEANUP_POLICY_CONFIG_VALUE = "compact";
+public static final List BROKER_IDS = Arrays.asList(1, 2);
+
+@Test
+public void testConstructorWithPartitionsAndReplicationFactor() {
+NewTopic topic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) 
REPLICATION_FACTOR);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NUM_PARTITIONS, topic.numPartitions());
+assertEquals(REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithOptionalValues() {
+Optional numPartitions = Optional.empty();
+Optional replicationFactor = Optional.empty();
+NewTopic topic = new NewTopic(TEST_TOPIC, numPartitions, 
replicationFactor);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NO_NUM_PARTITIONS, topic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithReplicasAssignments() {
+Map> replicasAssignments = new HashMap<>();
+replicasAssignments.put(0, BROKER_IDS);
+NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments);
+assertEquals(TEST_TOPIC, newTopic.name());
+assertEquals(NO_NUM_PARTITIONS, newTopic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, newTopic.replicationFactor());
+assertEquals(replicasAssignments, newTopic.replicasAssignments());
+}
+
+@Test
+public void testConfigs() {
+NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) 
REPLICATION_FACTOR);
+Map configs = new HashMap<>();
+configs.put(CLEANUP_POLICY_CONFIG_KEY, CLEANUP_POLICY_CONFIG_VALUE);
+newTopic.configs(configs);
+assertEquals(configs, newTopic.configs());
+}
+
+@Test
+public void testUnmodifiableReplicasAssignments() {
+Map> replicasAssignments = new HashMap<>();
+replicasAssignments.put(0, BROKER_IDS);
+NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments);
+Map> returnedAssignments = 
newTopic.replicasAssignments();
+
+assertThrows(UnsupportedOperationException.class, () -> {
+returnedAssignments.put(1, Arrays.asList(3, 4));
+});
+}
+
+@Test
+public void testConvertToCreatableTopic() {
+int partitionIndex = 0;
+Map> replicasAssignments = new HashMap<>();
+replicasAssignments.put(partitionIndex, BROKER_IDS);
+NewTopic topic = new NewTopic(TEST_TOPIC, replicasAssignments);
+Map configs = new HashMap<>();
+configs.put(CLEANUP_POLICY_CONFIG_KEY, CLEANUP_POLICY_CONFIG_VALUE);
+topic.configs(configs);
+
+CreateTopicsRequestData.CreatableTopic 

Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-06-09 Thread via GitHub


TaiJuWu commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1632369365


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 controllerNode.metadataDirectory());
 }
 
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

Review Comment:
   In this PR, I try to minimal this patch so I don't touch any helper function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16890: Compute valid log-start-offset when deleting overlapping remote segments [kafka]

2024-06-09 Thread via GitHub


dopuskh3 commented on code in PR #16237:
URL: https://github.com/apache/kafka/pull/16237#discussion_r1632367056


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2038,6 +2038,69 @@ public void testDeletionOnRetentionBreachedSegments(long 
retentionSize,
 assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
 }
 
+@ParameterizedTest(name = 
"testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} 
retentionMs={1}")
+@CsvSource(value = {"0, -1", "-1, 0"})
+public void testDeletionOnOverlappingRetentionBreachedSegments(long 
retentionSize,
+   long 
retentionMs)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+Map logProps = new HashMap<>();
+logProps.put("retention.bytes", retentionSize);
+logProps.put("retention.ms", retentionMs);
+LogConfig mockLogConfig = new LogConfig(logProps);
+when(mockLog.config()).thenReturn(mockLogConfig);
+
+List epochEntries = Collections.singletonList(epochEntry0);
+checkpoint.write(epochEntries);
+LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);

Review Comment:
   `scheduler` is not defined here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16890: Compute valid log-start-offset when deleting overlapping remote segments [kafka]

2024-06-09 Thread via GitHub


dopuskh3 commented on code in PR #16237:
URL: https://github.com/apache/kafka/pull/16237#discussion_r1632367056


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2038,6 +2038,69 @@ public void testDeletionOnRetentionBreachedSegments(long 
retentionSize,
 assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
 }
 
+@ParameterizedTest(name = 
"testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} 
retentionMs={1}")
+@CsvSource(value = {"0, -1", "-1, 0"})
+public void testDeletionOnOverlappingRetentionBreachedSegments(long 
retentionSize,
+   long 
retentionMs)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+Map logProps = new HashMap<>();
+logProps.put("retention.bytes", retentionSize);
+logProps.put("retention.ms", retentionMs);
+LogConfig mockLogConfig = new LogConfig(logProps);
+when(mockLog.config()).thenReturn(mockLogConfig);
+
+List epochEntries = Collections.singletonList(epochEntry0);
+checkpoint.write(epochEntries);
+LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);

Review Comment:
   `scheduler` is not defined here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16255:
URL: https://github.com/apache/kafka/pull/16255#discussion_r1632366297


##
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS;
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+public class NewTopicTest {
+
+public static final String TEST_TOPIC = "testtopic";
+public static final int NUM_PARTITIONS = 3;
+public static final int REPLICATION_FACTOR = 1;
+public static final String CLEANUP_POLICY_CONFIG_KEY = "cleanup.policy";
+public static final String CLEANUP_POLICY_CONFIG_VALUE = "compact";
+public static final List BROKER_IDS = Arrays.asList(1, 2);
+
+@Test
+public void testConstructorWithPartitionsAndReplicationFactor() {
+NewTopic topic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) 
REPLICATION_FACTOR);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NUM_PARTITIONS, topic.numPartitions());
+assertEquals(REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithOptionalValues() {
+Optional numPartitions = Optional.empty();
+Optional replicationFactor = Optional.empty();
+NewTopic topic = new NewTopic(TEST_TOPIC, numPartitions, 
replicationFactor);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NO_NUM_PARTITIONS, topic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithReplicasAssignments() {
+Map> replicasAssignments = new HashMap<>();
+replicasAssignments.put(0, BROKER_IDS);
+NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments);
+assertEquals(TEST_TOPIC, newTopic.name());
+assertEquals(NO_NUM_PARTITIONS, newTopic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, newTopic.replicationFactor());
+assertEquals(replicasAssignments, newTopic.replicasAssignments());
+}
+
+@Test
+public void testConfigs() {
+NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) 
REPLICATION_FACTOR);
+Map configs = new HashMap<>();

Review Comment:
   Could you verify that `newTopic.configs()` will return null if it has no 
configs



##
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import 

Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16244:
URL: https://github.com/apache/kafka/pull/16244#discussion_r1632364409


##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+private OffsetIndex index;
+private static final long BASE_OFFSET = 45L;
+
+@BeforeEach
+public void setup() throws IOException {
+index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+}
+
+@AfterEach
+public void tearDown() throws IOException {
+if (index != null) {
+Files.deleteIfExists(index.file().toPath());
+}
+this.index.close();

Review Comment:
   you should call close before deleting the index file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled

2024-06-09 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16885.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to 
> RemoteLogManagerConfig#isRemoteStorageSystemEnabled
> -
>
> Key: KAFKA-16885
> URL: https://issues.apache.org/jira/browse/KAFKA-16885
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia Chuan Yu
>Priority: Major
> Fix For: 3.9.0
>
>
> see the discussion: 
> https://github.com/apache/kafka/pull/16153#issuecomment-2144269279



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled [kafka]

2024-06-09 Thread via GitHub


chia7712 merged PR #16256:
URL: https://github.com/apache/kafka/pull/16256


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16250:
URL: https://github.com/apache/kafka/pull/16250#discussion_r1632361233


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2760,9 +2760,12 @@ void handleFailure(Throwable throwable) {
 }, now);
 }
 
-return new DescribeConfigsResult(new 
HashMap<>(nodeFutures.entrySet().stream()
-.flatMap(x -> x.getValue().entrySet().stream())
-.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue;
+Map> resourceToConfigFuture = 
nodeFutures.entrySet()

Review Comment:
   why we need this temporary variable? the following style works on my local.
   ```java
   return new DescribeConfigsResult(nodeFutures.entrySet()
   .stream()
   .flatMap(x -> x.getValue().entrySet().stream())
   .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module (part 1) [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on PR #16253:
URL: https://github.com/apache/kafka/pull/16253#issuecomment-2156719721

   @m1a2st Could you please split this PR into small PRs (similar to #16253)? 
It is hard to complete the migration (and review) at once.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632360489


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {
+@SuppressWarnings("deprecation")
+public final static ConfigDef CONFIG_DEF =  new ConfigDef()

Review Comment:
   Could it include `RemoteLogManagerConfig.configDef()`?



##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;

Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632358879


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {

Review Comment:
   you are right. my previous idea is over-engineering :_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16231:
URL: https://github.com/apache/kafka/pull/16231#discussion_r1632358803


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Threads(2)
+public class ConcurrentMapBenchmark {
+private static final int TIMES = 1000_000;
+
+@Param({"100"})
+private int mapSize;
+
+@Param({"0.1"})
+private double writePercentage;
+
+private Map concurrentHashMap;
+private Map copyOnWriteMap;
+private int writePerLoops;
+
+@Setup
+public void setup() {
+Map mapTemplate = IntStream.range(0, mapSize).boxed()
+.collect(Collectors.toMap(i -> i, i -> i));
+concurrentHashMap = new ConcurrentHashMap<>(mapTemplate);
+copyOnWriteMap = new CopyOnWriteMap<>(mapTemplate);
+writePerLoops = TIMES / (int) Math.round(writePercentage * TIMES);
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testConcurrentHashMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+blackhole.consume(concurrentHashMap.get(i % mapSize));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testConcurrentHashMapGetRandom(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+
blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, 
mapSize + 1)));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testCopyOnWriteMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+blackhole.consume(copyOnWriteMap.get(i % mapSize));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testCopyOnWriteMapGetRandom(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+
blackhole.consume(copyOnWriteMap.get(ThreadLocalRandom.current().nextInt(0, 
mapSize + 1)));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+   

Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1632358372


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -353,6 +354,9 @@ static private void setupNodeDirectories(File baseDirectory,
 private final TestKitNodes nodes;
 private final Map controllers;
 private final Map brokers;
+
+private final Map aliveBrokers;

Review Comment:
   > if a broker already shutdown, how can we access the server and check its 
status?
   
   both zk and kraft broker has in-memory state, so maybe you can leverage them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1632358211


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 controllerNode.metadataDirectory());
 }
 
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

Review Comment:
   not sure why we need to call that test helper?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632355536


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {

Review Comment:
   could you please rename it to `testIterator`?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
 assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+assertEquals(currentPartition, record.partition());
+assertEquals(recordCount % recordSize, record.offset());
+assertEquals(recordCount % recordSize, record.key());
+assertEquals(String.valueOf(recordCount % recordSize), 
record.value());
+
+recordCount++;
 }
-assertEquals(2, c);
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecords() {

Review Comment:
   Could you add test for `records(TopicPartition)`?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 

Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]

2024-06-09 Thread via GitHub


dopuskh3 commented on PR #16210:
URL: https://github.com/apache/kafka/pull/16210#issuecomment-2156701940

   @kamalcph updated test case. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16919) Flaky test testNoCheckpointsIfNoRecordsAreMirrored() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest

2024-06-09 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853511#comment-17853511
 ] 

Muralidhar Basani commented on KAFKA-16919:
---

[~soarez] I suspect there could be resource constraints for back up cluster, 
after the primary cluster is started.

We can try increasing the timeout for starting backup workers or we can start 
both the clusters in parallel.

> Flaky test  testNoCheckpointsIfNoRecordsAreMirrored() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
> --
>
> Key: KAFKA-16919
> URL: https://issues.apache.org/jira/browse/KAFKA-16919
> Project: Kafka
>  Issue Type: Test
>Reporter: Igor Soarez
>Priority: Minor
>
> Source 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16251/1/tests]
>  
> {code:java}
> java.lang.AssertionError: Workers of backup-connect-cluster did not start in 
> time.
>     at 
> org.apache.kafka.connect.util.clusters.ConnectAssertions.assertAtLeastNumWorkersAreUp(ConnectAssertions.java:74)
>     at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:230)
>     at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:150)
>     at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:64)
>     at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]

2024-06-09 Thread via GitHub


dopuskh3 commented on PR #16210:
URL: https://github.com/apache/kafka/pull/16210#issuecomment-2156696910

   > @dopuskh3
   > 
   > Any updates on this?
   
   Should be able to give it a try tomorrow. Pulling the fix on our internal 
branch right now.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15166) Add deletePartition API to the RemoteStorageManager

2024-06-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15166:
-
Fix Version/s: (was: 3.8.0)

> Add deletePartition API to the RemoteStorageManager
> ---
>
> Key: KAFKA-15166
> URL: https://issues.apache.org/jira/browse/KAFKA-15166
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> Remote Storage Manager exposes {{deleteLogSegmentData}} API to delete the 
> individual log segments. Storage providers such as HDFS have support to 
> delete a directory. Having an {{deletePartition}} API to delete the data at 
> the partition level will enhance the topic deletion.
> This task may require a KIP as it touches the user-facing APIs.
>  
> Please also remember to remove the comment on the test here: 
> https://github.com/apache/kafka/pull/13837#discussion_r1247676834



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2024-06-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash resolved KAFKA-15777.
--
Resolution: Won't Fix

> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


OmniaGM commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {

Review Comment:
   Sorry for the late response. It would be nice separation however I am not 
sure it would be useful as these definition rarely will be re-used with the 
same structure and definition outside of their classes. For example 
`RemoteLogManagerConfig.CONFIG_DEF` will be only used in 
`RemoteLogManagerConfig` and not needed outside of this context. The 
`ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. 
   
   Also we have some configs that common between server and other components of 
kafka and having multiple definition for different context for the same configs 
might be confusing going forward. 
   
   We also might create confusion regarding where to define these config 
definitions inside  `Config/Configs` class or within `KafkaConfigDef`. For 
example we can move most config definition for things like `QUOTA`, 
`GROUP_COORDINATOR`,  `REPLICATION` etc to their `Configs` classes however 
things like `SSL`, `SASL` get mostly populated from public classes and `LOG` 
while most of the `CONFIG` fields are defined in private class 
`ServerLogConfigs`, the defaults are defined in storage package and we can't 
depend on the storage as it will create circular dependency between `storage` 
and 

Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-09 Thread via GitHub


m1a2st commented on code in PR #16244:
URL: https://github.com/apache/kafka/pull/16244#discussion_r1632348365


##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java:
##
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+private OffsetIndex index;
+private static final long BASE_OFFSET = 45L;
+
+@BeforeEach
+public void setup() throws IOException {
+index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+}
+
+@AfterEach
+public void tearDown() throws IOException {
+if (index != null) {
+Files.deleteIfExists(index.file().toPath());

Review Comment:
   @TaiJuWu, It is a good idea. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on PR #16158:
URL: https://github.com/apache/kafka/pull/16158#issuecomment-2156662574

   I prefer to check all failed tests before merging. And the last commit of 
this PR does not have completed CI. That is why I suggest to rebase code to 
trigger QA again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]

2024-06-09 Thread via GitHub


brandboat commented on code in PR #16231:
URL: https://github.com/apache/kafka/pull/16231#discussion_r1632327595


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Threads(2)
+public class ConcurrentMapBenchmark {
+@Param({"100"})
+private int times;
+
+@Param({"100"})
+private int mapSize;
+
+// execute 1 computeIfAbsent per 1 loops
+@Param({"1"})
+private int writePerLoops;
+
+private Map concurrentHashMap;
+private Map copyOnWriteMap;
+
+@Setup
+public void setup() {
+Map mapTemplate = IntStream.range(0, mapSize).boxed()
+.collect(Collectors.toMap(i -> i, i -> i));
+concurrentHashMap = new ConcurrentHashMap<>(mapTemplate);
+copyOnWriteMap = new CopyOnWriteMap<>(mapTemplate);
+}
+
+@Benchmark
+public void testConcurrentHashMap(Blackhole blackhole) {

Review Comment:
   thanks ! now the benchmark result are as belows.
   
   ```
   # OS: openSUSE Tumbleweed
   # JMH version: 1.37
   # VM version: JDK 21.0.3, OpenJDK 64-Bit Server VM, 21.0.3+9-LTS (zulu)
   # CPU: 13th Gen Intel(R) Core(TM) i9-13980HX
   
   Benchmark  (mapSize)  
(writePercentage)  Mode  Cnt  Score   Error  Units
   ConcurrentMapBenchmark.testConcurrentHashMapEntrySet 100 
   0.1  avgt   15  2.368 ± 0.066  ns/op
   ConcurrentMapBenchmark.testConcurrentHashMapGet  100 
   0.1  avgt   15  4.385 ± 0.013  ns/op
   ConcurrentMapBenchmark.testConcurrentHashMapGetRandom100 
   0.1  avgt   15  8.895 ± 0.019  ns/op
   ConcurrentMapBenchmark.testConcurrentHashMapValues   100 
   0.1  avgt   15  2.289 ± 0.052  ns/op
   ConcurrentMapBenchmark.testCopyOnWriteMapEntrySet100 
   0.1  avgt   15  2.442 ± 0.072  ns/op
   ConcurrentMapBenchmark.testCopyOnWriteMapGet 100 
   0.1  avgt   15  4.664 ± 0.014  ns/op
   ConcurrentMapBenchmark.testCopyOnWriteMapGetRandom   100 
   0.1  avgt   15  6.243 ± 0.033  ns/op
   ConcurrentMapBenchmark.testCopyOnWriteMapValues  100 
   0.1  avgt   15  2.435 ± 0.174  ns/op
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on PR #16158:
URL: https://github.com/apache/kafka/pull/16158#issuecomment-2156661854

   @riedelmax #16249 fix the blocked tests. Without that fix, the CI will get 
timeout when running your PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


OmniaGM commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {

Review Comment:
   Sorry for the late response. It would be nice separation however I am not 
sure it would be useful as these definition rarely will be re-used with the 
same structure and definition outside of their classes. For example 
`RemoteLogManagerConfig.CONFIG_DEF` will be only used in 
`RemoteLogManagerConfig` and not needed outside of this context. The 
`ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. 
   
   Also we have some configs that common between server and other components of 
kafka and having multiple definition for different context for the same configs 
might be confusing going forward. 
   
   We also might create confusion of when to defend config definition within 
the `Config` class and when to create it within `KafkaConfigDef`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #16078:
URL: https://github.com/apache/kafka/pull/16078#discussion_r1632316732


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -817,6 +817,119 @@ class DynamicBrokerConfigTest {
 Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
   }
 
+  @Test
+  def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
+val copyQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
+
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+val config = KafkaConfig.fromProps(props)
+val serverMock: KafkaServer = mock(classOf[KafkaServer])
+val remoteLogManagerMockOpt = 
Option(Mockito.mock(classOf[RemoteLogManager]))
+
+Mockito.when(serverMock.config).thenReturn(config)
+
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+config.dynamicConfig.initialize(None, None)
+config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
 config.getLong(copyQuotaProp))
+
+// Update default config
+props.put(copyQuotaProp, "100")
+config.dynamicConfig.updateDefaultConfig(props)
+assertEquals(100, config.getLong(copyQuotaProp))
+Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)
+
+// Update per broker config
+props.put(copyQuotaProp, "200")
+config.dynamicConfig.updateBrokerConfig(0, props)
+assertEquals(200, config.getLong(copyQuotaProp))
+Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)
+
+Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+  }
+
+  @Test
+  def testRemoteLogManagerFetchQuotaUpdates(): Unit = {

Review Comment:
   this test is similar to `testRemoteLogManagerCopyQuotaUpdates`. can we 
refactor the test to extract the method? Or, use parameterized test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #16078:
URL: https://github.com/apache/kafka/pull/16078#discussion_r1632316363


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -817,6 +817,119 @@ class DynamicBrokerConfigTest {
 Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
   }
 
+  @Test
+  def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
+val copyQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
+
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+val config = KafkaConfig.fromProps(props)
+val serverMock: KafkaServer = mock(classOf[KafkaServer])
+val remoteLogManagerMockOpt = 
Option(Mockito.mock(classOf[RemoteLogManager]))
+
+Mockito.when(serverMock.config).thenReturn(config)
+
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+config.dynamicConfig.initialize(None, None)
+config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
 config.getLong(copyQuotaProp))
+
+// Update default config
+props.put(copyQuotaProp, "100")
+config.dynamicConfig.updateDefaultConfig(props)
+assertEquals(100, config.getLong(copyQuotaProp))

Review Comment:
   should we move the `getter` method from RemoteLogManagerConfig to 
KafkaConfig class?
   
   1. remoteLogManagerCopyMaxBytesPerSecond
   2. remoteLogManagerFetchMaxBytesPerSecond and
   3. remoteLogIndexFileCacheTotalSizeBytes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on PR #16210:
URL: https://github.com/apache/kafka/pull/16210#issuecomment-2156632862

   @dopuskh3 
   
   Any updates on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16401) Client requests and consumer-pref test failed

2024-06-09 Thread gendong1 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gendong1 updated KAFKA-16401:
-
Description: 
The cluster consists of 3 nodes. When the storeOffsets is delay due to 
fail-slow disk, the corresponding thread is blocked and holds a lock.The retry 
requests consume the IO thread and network threads set in server.proterty. 
However, the retry requests is blocked since they cannot acquire the lock.  The 
coming client requests and cosumer-pref tests failed.

Are there any comments to figure out this issues? I will very appreciate them.

  was:The cluster consists of 3 nodes. When the storeOffsets is delay due to 
fail-slow disk, the corresponding thread is blocked and holds a lock.The retry 
requests consume the IO thread and network threads set in server.proterty. 
However, the retry requests is blocked since they cannot acquire the lock.  The 
coming client requests and cosumer-pref tests failed.


> Client requests and consumer-pref test failed
> -
>
> Key: KAFKA-16401
> URL: https://issues.apache.org/jira/browse/KAFKA-16401
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.2
>Reporter: gendong1
>Priority: Major
>
> The cluster consists of 3 nodes. When the storeOffsets is delay due to 
> fail-slow disk, the corresponding thread is blocked and holds a lock.The 
> retry requests consume the IO thread and network threads set in 
> server.proterty. However, the retry requests is blocked since they cannot 
> acquire the lock.  The coming client requests and cosumer-pref tests failed.
> Are there any comments to figure out this issues? I will very appreciate them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled [kafka]

2024-06-09 Thread via GitHub


chiacyu opened a new pull request, #16256:
URL: https://github.com/apache/kafka/pull/16256

   This PR is based on 
[KAFKA-16885](https://issues.apache.org/jira/browse/KAFKA-16885). See the 
[discussion](https://github.com/apache/kafka/pull/16153#issuecomment-2144269279)
 for further details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]

2024-06-09 Thread via GitHub


riedelmax commented on PR #16158:
URL: https://github.com/apache/kafka/pull/16158#issuecomment-2156601592

   @chia7712 would you mind to explain how #16249 effects this PR? if there are 
no conflicts with trunk i dont need to rebase or merge manually right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled

2024-06-09 Thread Chia Chuan Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853493#comment-17853493
 ] 

Chia Chuan Yu commented on KAFKA-16885:
---

Hi, [~ckamal] 

Yes, I'm working on this one.

> Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to 
> RemoteLogManagerConfig#isRemoteStorageSystemEnabled
> -
>
> Key: KAFKA-16885
> URL: https://issues.apache.org/jira/browse/KAFKA-16885
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia Chuan Yu
>Priority: Major
>
> see the discussion: 
> https://github.com/apache/kafka/pull/16153#issuecomment-2144269279



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled

2024-06-09 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853492#comment-17853492
 ] 

Kamal Chandraprakash commented on KAFKA-16885:
--

[~chiacyu] 

Are you working on this?

> Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to 
> RemoteLogManagerConfig#isRemoteStorageSystemEnabled
> -
>
> Key: KAFKA-16885
> URL: https://issues.apache.org/jira/browse/KAFKA-16885
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia Chuan Yu
>Priority: Major
>
> see the discussion: 
> https://github.com/apache/kafka/pull/16153#issuecomment-2144269279



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


OmniaGM commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {

Review Comment:
   Sorry for the late response. It would be nice separation however I am not 
sure it would be useful as these definition rarely will be re-used with the 
same structure and definition outside of their classes. For example 
`RemoteLogManagerConfig.CONFIG_DEF` will be only used in 
`RemoteLogManagerConfig` and not needed outside of this context. The 
`ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. 
Also we have some configs that common between server and other components of 
kafka and having multiple definition for different context for the same configs 
might be confusing going forward. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-09 Thread via GitHub


OmniaGM commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public class KafkaConfig {

Review Comment:
   Sorry for the late respond. It would be nice separation however I am not 
sure it would be useful as these definition rarely will be re-used with the 
same structure and definition outside of their classes. For example 
`RemoteLogManagerConfig.CONFIG_DEF` will be only used in 
`RemoteLogManagerConfig` and not needed outside of this context. The 
`ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. 
Also we have some configs that common between server and other components of 
kafka and having multiple definition for different context for the same configs 
might be confusing going forward. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-09 Thread via GitHub


muralibasani commented on PR #16255:
URL: https://github.com/apache/kafka/pull/16255#issuecomment-2156419270

   @chia7712 can you pls take a look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #16078:
URL: https://github.com/apache/kafka/pull/16078#discussion_r1632220708


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1165,43 +1165,66 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 newConfig.values.forEach { (k, v) =>
-  if (reconfigurableConfigs.contains(k)) {
-if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
-  val newValue = v.asInstanceOf[Long]
-  val oldValue = getValue(server.config, k)
-  if (newValue != oldValue && newValue <= 0) {
-val errorMsg = s"Dynamic remote log manager config update 
validation failed for $k=$v"
-throw new ConfigException(s"$errorMsg, value should be at least 1")
-  }
+  if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
 ||

Review Comment:
   can we do an inverse check since we removed the `contains` check? What if 
the `k` is null?
   
   ```
   if 
(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k))
 || ...
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #16078:
URL: https://github.com/apache/kafka/pull/16078#discussion_r1632220708


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1165,43 +1165,66 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 newConfig.values.forEach { (k, v) =>
-  if (reconfigurableConfigs.contains(k)) {
-if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
-  val newValue = v.asInstanceOf[Long]
-  val oldValue = getValue(server.config, k)
-  if (newValue != oldValue && newValue <= 0) {
-val errorMsg = s"Dynamic remote log manager config update 
validation failed for $k=$v"
-throw new ConfigException(s"$errorMsg, value should be at least 1")
-  }
+  if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
 ||

Review Comment:
   can we do an inverse check? What if the `k` is null?
   
   ```
   if 
(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k))
 || ...
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-09 Thread via GitHub


muralibasani opened a new pull request, #16255:
URL: https://github.com/apache/kafka/pull/16255

   Resolves https://issues.apache.org/jira/browse/KAFKA-16922
   
   Adding unit tests for org.apache.kafka.clients.admin.NewTopic
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [X] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws 
Exception {
 }
 
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+// Verify the highest offset in remote storage was -1L before the 
copy started
+assertEquals(-1L, capture.getValue());
+} else {
+// Verify the copy operation completes within the timeout, since 
it does not need to wait for quota availability
+assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+// Verify quota check was performed
+verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+// Verify bytes to copy was recorded with the quota manager
+verify(rlmCopyQuotaManager, times(1)).record(10);
+
+// Verify the highest offset in remote storage is updated
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+List capturedValues = capture.getAllValues();
+// Verify the highest offset in remote storage was -1L before the 
copy
+assertEquals(-1L, capturedValues.get(0).longValue());
+// Verify it was updated to 149L after the copy
+assertEquals(149L, capturedValues.get(1).longValue());
+}
+}
+
+@Test
+public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(true);
+
+Thread t = new Thread(task);
+t.start();
+// Sleep for a while to allow the task to start and quota check to be 
performed once
+Thread.sleep(100);

Review Comment:
   `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not 
tested E2E. Can we rewrite the test? 
   
   ```
   @Test
   public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
   remoteLogManager.startup();
   setupRLMTask(true);
   remoteLogManager.onLeadershipChange(
   Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
   TestUtils.waitForCondition(() -> {
   verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
   return true;
   }, "Quota exceeded check did not happen");
   assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
remoteLogManager.close());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15623: Migrate streams tests (part 1) module to JUnit 5 [kafka]

2024-06-09 Thread via GitHub


FrankYang0529 opened a new pull request, #16254:
URL: https://github.com/apache/kafka/pull/16254

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-09 Thread via GitHub


kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws 
Exception {
 }
 
 
+@ParameterizedTest
+@ValueSource(booleans = {false, true})
+public void testCopyQuota(boolean quotaExceeded) throws Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+
+if (quotaExceeded) {
+// Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofMillis(200), () -> 
task.copyLogSegmentsToRemote(mockLog)));
+
+// Verify the highest offset in remote storage is updated only once
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+// Verify the highest offset in remote storage was -1L before the 
copy started
+assertEquals(-1L, capture.getValue());
+} else {
+// Verify the copy operation completes within the timeout, since 
it does not need to wait for quota availability
+assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+// Verify quota check was performed
+verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+// Verify bytes to copy was recorded with the quota manager
+verify(rlmCopyQuotaManager, times(1)).record(10);
+
+// Verify the highest offset in remote storage is updated
+ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class);
+verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+List capturedValues = capture.getAllValues();
+// Verify the highest offset in remote storage was -1L before the 
copy
+assertEquals(-1L, capturedValues.get(0).longValue());
+// Verify it was updated to 149L after the copy
+assertEquals(149L, capturedValues.get(1).longValue());
+}
+}
+
+@Test
+public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
+RemoteLogManager.RLMTask task = setupRLMTask(true);
+
+Thread t = new Thread(task);
+t.start();
+// Sleep for a while to allow the task to start and quota check to be 
performed once
+Thread.sleep(100);

Review Comment:
   `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not 
tested E2E. Can we rewrite the test? Also, we have to give some buffer time in 
the close timeout (quotaTimeout + 50 ms) to avoid test flakiness.
   
   ```
   @Test
   public void testRLMTaskShutdownDuringQuotaExceededScenario() throws 
Exception {
   remoteLogManager.startup();
   setupRLMTask(true);
   remoteLogManager.onLeadershipChange(
   Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
   TestUtils.waitForCondition(() -> {
   verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
   return true;
   }, "Quota exceeded check did not happen");
   assertTimeoutPreemptively(Duration.ofMillis(150), () -> 
remoteLogManager.close());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-09 Thread via GitHub


TaiJuWu commented on code in PR #16244:
URL: https://github.com/apache/kafka/pull/16244#discussion_r1632215976


##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java:
##
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+private OffsetIndex index;
+private static final long BASE_OFFSET = 45L;
+
+@BeforeEach
+public void setup() throws IOException {
+index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+}
+
+@AfterEach
+public void tearDown() throws IOException {
+if (index != null) {
+Files.deleteIfExists(index.file().toPath());

Review Comment:
   Maybe it can be added `this.index.close()` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-09 Thread via GitHub


TaiJuWu commented on code in PR #16244:
URL: https://github.com/apache/kafka/pull/16244#discussion_r1632215976


##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java:
##
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+private OffsetIndex index;
+private static final long BASE_OFFSET = 45L;
+
+@BeforeEach
+public void setup() throws IOException {
+index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+}
+
+@AfterEach
+public void tearDown() throws IOException {
+if (index != null) {
+Files.deleteIfExists(index.file().toPath());

Review Comment:
   Maybe it can add `this.index.close()` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16922) add unit test for NewTopic

2024-06-09 Thread Muralidhar Basani (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muralidhar Basani reassigned KAFKA-16922:
-

Assignee: Muralidhar Basani  (was: Chia-Ping Tsai)

> add unit test for NewTopic
> --
>
> Key: KAFKA-16922
> URL: https://issues.apache.org/jira/browse/KAFKA-16922
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Muralidhar Basani
>Priority: Minor
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-06-09 Thread via GitHub


TaiJuWu commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1632204289


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -353,6 +354,9 @@ static private void setupNodeDirectories(File baseDirectory,
 private final TestKitNodes nodes;
 private final Map controllers;
 private final Map brokers;
+
+private final Map aliveBrokers;

Review Comment:
   Sounds great but if a broker already shutdown, how can we access the server 
and check its status?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-06-09 Thread via GitHub


TaiJuWu commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1632203912


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 controllerNode.metadataDirectory());
 }
 
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

Review Comment:
   The TestUtils requires sever use `PLAINTEXT` protocol, you can reference 
[here.](https://github.com/apache/kafka/blob/d6cd83e2fb2bab4526f07e067277b34e482f6678/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L232)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] improve log description of QuorumController [kafka]

2024-06-09 Thread via GitHub


chickenchickenlove commented on code in PR #15926:
URL: https://github.com/apache/kafka/pull/15926#discussion_r1632188911


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -165,6 +165,8 @@ static ControllerResult recordsForNonEmptyLog(
 throw new RuntimeException("Should not have ZK 
migrations enabled on a cluster that was " +
 "created in KRaft mode.");
 }
+logMessageBuilder
+.append("This is expected because this is a 
de-novo KRaft cluster.");

Review Comment:
   @mumrah , i make new commit to apply your comments. 
   However, the `CI` still is failed. 
   The changes which i made seems not to be related with failure of CI. 
   Am i wrong? 
   https://github.com/apache/kafka/assets/90125071/d013add9-814d-4ad9-8c2e-cb2955455645;>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16253:
URL: https://github.com/apache/kafka/pull/16253#discussion_r1632184801


##
build.gradle:
##
@@ -3248,7 +3248,6 @@ project(':connect:runtime') {
 testImplementation project(':group-coordinator')
 
 testImplementation libs.junitJupiterApi
-testImplementation libs.junitVintageEngine

Review Comment:
   we should remove all usage of junit 4 code form connect module. If you get 
such error, please rewrite them by junit 5 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module [kafka]

2024-06-09 Thread via GitHub


m1a2st commented on code in PR #16253:
URL: https://github.com/apache/kafka/pull/16253#discussion_r1632184386


##
build.gradle:
##
@@ -3248,7 +3248,6 @@ project(':connect:runtime') {
 testImplementation project(':group-coordinator')
 
 testImplementation libs.junitJupiterApi
-testImplementation libs.junitVintageEngine

Review Comment:
   There is a little problem, If I remove the `testImplementation 
libs.junitVintageEngine`, Gradle can't get `org.junit.` dependencies and some 
annotations are error. I find a way to resolve it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2156349665

   Hi @chia7712 , I have do some refactors, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module [kafka]

2024-06-09 Thread via GitHub


m1a2st commented on code in PR #16253:
URL: https://github.com/apache/kafka/pull/16253#discussion_r1632175364


##
build.gradle:
##
@@ -540,9 +537,6 @@ subprojects {
 
 exclude testsToExclude
 
-if (shouldUseJUnit5)
-  useJUnitPlatform()

Review Comment:
   Sorry, I misunderstand the `shouldUseJUnit5` variable logic, so I rollback it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org