Re: [PR] KAFKA-16894: Define group.version=2 [kafka]
omkreddy commented on PR #16212: URL: https://github.com/apache/kafka/pull/16212#issuecomment-2157321710 There are few test failures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
[ https://issues.apache.org/jira/browse/KAFKA-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853549#comment-17853549 ] Ksolves commented on KAFKA-16803: - Sure, [~gharris1727]. Will create PR and update you. > Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil > > > Key: KAFKA-16803 > URL: https://issues.apache.org/jira/browse/KAFKA-16803 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Priority: Major > Labels: newbie > > The org.gradle.util.ConfigureUtil type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations] > 2 usages > Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin > Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
[ https://issues.apache.org/jira/browse/KAFKA-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ksolves reassigned KAFKA-16803: --- Assignee: Ksolves > Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil > > > Key: KAFKA-16803 > URL: https://issues.apache.org/jira/browse/KAFKA-16803 > Project: Kafka > Issue Type: Sub-task >Reporter: Greg Harris >Assignee: Ksolves >Priority: Major > Labels: newbie > > The org.gradle.util.ConfigureUtil type has been deprecated. > This is scheduled to be removed in Gradle 9.0. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations] > 2 usages > Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin > Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16924) No log output when running kafka
[ https://issues.apache.org/jira/browse/KAFKA-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16924: -- Description: In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender dependency, and add testImplementation dependency for `slf4jlog4j` lib. However, we need this runtime dependency in tools module to output logs. ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back. Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. Since it's removed, we need to explicitly declare it. Current output will be like this: {code:java} > ./gradlew clean jar > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.{code} was: In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender dependency, and add testImplementation dependency for `slf4jlog4j` lib. However, we need this runtime dependency in tools module to output logs. ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back. Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. Since it's removed, we need to explicitly declare it. Current output will be like this: {code:java} > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.{code} > No log output when running kafka > - > > Key: KAFKA-16924 > URL: https://issues.apache.org/jira/browse/KAFKA-16924 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 4.0.0 > > > In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender > dependency, and add testImplementation dependency for `slf4jlog4j` lib. > However, we need this runtime dependency in tools module to output logs. > ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back. > > Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. > Since it's removed, we need to explicitly declare it. > > Current output will be like this: > {code:java} > > ./gradlew clean jar > > bin/kafka-server-start.sh config/kraft/controller.properties > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details.{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]
chia7712 commented on code in PR #16260: URL: https://github.com/apache/kafka/pull/16260#discussion_r1632550895 ## build.gradle: ## @@ -2178,6 +2177,7 @@ project(':tools') { implementation libs.jacksonDataformatCsv implementation libs.jacksonJDK8Datatypes implementation libs.slf4jApi +implementation libs.slf4jlog4j Review Comment: ya, we do need that binding jar when running broker/controller. But stream examples module has declared that: https://github.com/apache/kafka/blob/trunk/build.gradle#L2613 I check the distribution, and both `reload4j` and `slf4j-reload4j` are existent. ![螢幕快照 2024-06-10 12-08-23](https://github.com/apache/kafka/assets/6234750/d8bdcd4f-5344-44db-b72e-4cc7710ae2cf) Also, I run the command you described but I don't see "Failed to load class .. " ![螢幕快照 2024-06-10 12-07-27](https://github.com/apache/kafka/assets/6234750/cffda7d9-9c94-48ec-a7e4-ef80746f61d7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16114:Fix partiton not retention after cancel alter intra broke… [kafka]
github-actions[bot] commented on PR #15172: URL: https://github.com/apache/kafka/pull/15172#issuecomment-2157136679 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16295: Align RocksDB and in-memory store init() sequences [kafka]
github-actions[bot] commented on PR #15421: URL: https://github.com/apache/kafka/pull/15421#issuecomment-2157136573 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]
github-actions[bot] commented on PR #15451: URL: https://github.com/apache/kafka/pull/15451#issuecomment-2157136549 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2157060477 Hi @chia7712 , I have do some changes and add a test case, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16913) Support external schemas in JSONConverter
[ https://issues.apache.org/jira/browse/KAFKA-16913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ganesh Sadanala reassigned KAFKA-16913: --- Assignee: Ganesh Sadanala > Support external schemas in JSONConverter > - > > Key: KAFKA-16913 > URL: https://issues.apache.org/jira/browse/KAFKA-16913 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Priyanka K U >Assignee: Ganesh Sadanala >Priority: Minor > > KIP-1054: Support external schemas in JSONConverter : > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484396 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +assertEquals(currentPartition, record.partition()); +assertEquals(recordCount % recordSize, record.offset()); +assertEquals(recordCount % recordSize, record.key()); +assertEquals(String.valueOf(recordCount % recordSize), record.value()); + +recordCount++; } -assertEquals(2, c); + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecords() { Review Comment: I will do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484236 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +assertEquals(currentPartition, record.partition()); +assertEquals(recordCount % recordSize, record.offset()); +assertEquals(recordCount % recordSize, record.key()); +assertEquals(String.valueOf(recordCount % recordSize), record.value()); + +recordCount++; } -assertEquals(2, c); + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecords() { +List topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); +int recordSize = 3; +int partitionSize = 10; +int emptyPartitionIndex = 6; +int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); +Iterator> iterator = records.iterator(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} + +assertEquals(topic,
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632481576 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { Review Comment: Sure, don't even notice this case doesn't have `test` prefix 藍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]
frankvicky commented on PR #16250: URL: https://github.com/apache/kafka/pull/16250#issuecomment-2157010773 Hi @chia7712, I have do a simple change based on your feedback, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]
frankvicky commented on code in PR #16250: URL: https://github.com/apache/kafka/pull/16250#discussion_r1632479332 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2760,9 +2760,12 @@ void handleFailure(Throwable throwable) { }, now); } -return new DescribeConfigsResult(new HashMap<>(nodeFutures.entrySet().stream() -.flatMap(x -> x.getValue().entrySet().stream()) -.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue; +Map> resourceToConfigFuture = nodeFutures.entrySet() Review Comment: Oh, it just my personal coding style, I will modify it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]
showuon commented on PR #16260: URL: https://github.com/apache/kafka/pull/16260#issuecomment-2156996030 @chia7712 , could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16924) No log output when running kafka
[ https://issues.apache.org/jira/browse/KAFKA-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16924: -- Description: In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender dependency, and add testImplementation dependency for `slf4jlog4j` lib. However, we need this runtime dependency in tools module to output logs. ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back. Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. Since it's removed, we need to explicitly declare it. Current output will be like this: {code:java} > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.{code} was: In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender dependency, and add testImplementation dependency for `slf4jlog4j` lib. However, we need this runtime dependency in tools module to output logs. ([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back. Current output will be like this: {code:java} > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.{code} > No log output when running kafka > - > > Key: KAFKA-16924 > URL: https://issues.apache.org/jira/browse/KAFKA-16924 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 4.0.0 > > > In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender > dependency, and add testImplementation dependency for `slf4jlog4j` lib. > However, we need this runtime dependency in tools module to output logs. > ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back. > > Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. > Since it's removed, we need to explicitly declare it. > > Current output will be like this: > {code:java} > > bin/kafka-server-start.sh config/kraft/controller.properties > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details.{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16924) No log output when running kafka
[ https://issues.apache.org/jira/browse/KAFKA-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16924: -- Description: In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender dependency, and add testImplementation dependency for `slf4jlog4j` lib. However, we need this runtime dependency in tools module to output logs. ([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back. Current output will be like this: {code:java} > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.{code} was: In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender dependency, and "Add {{compileOnly}} dependency from {{tools}} to {{log4j}} (same approach as {{{}core{}}})." . However, we need this runtime dependency in tools module to output logs. Adding this dependency back. Current output will be like this: {code:java} > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.{code} > No log output when running kafka > - > > Key: KAFKA-16924 > URL: https://issues.apache.org/jira/browse/KAFKA-16924 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 4.0.0 > > > In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender > dependency, and add testImplementation dependency for `slf4jlog4j` lib. > However, we need this runtime dependency in tools module to output logs. > ([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back. > > Current output will be like this: > {code:java} > > bin/kafka-server-start.sh config/kraft/controller.properties > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details.{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]
showuon opened a new pull request, #16260: URL: https://github.com/apache/kafka/pull/16260 In https://github.com/apache/kafka/pull/12148 , we removed log4jAppender dependency, and add testImplementation dependency for `slf4jlog4j` lib. However, we need this runtime dependency in tools module to output logs. ([ref](https://stackoverflow.com/a/21787813)) Adding this dependency back. Current output will be like this: ``` > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. ``` After this PR, it'll be normal: ``` > bin/kafka-server-start.sh config/kraft/controller.properties [2024-06-10 08:38:50,734] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2024-06-10 08:38:50,832] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) ... ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16924) No log output when running kafka
Luke Chen created KAFKA-16924: - Summary: No log output when running kafka Key: KAFKA-16924 URL: https://issues.apache.org/jira/browse/KAFKA-16924 Project: Kafka Issue Type: Bug Reporter: Luke Chen Assignee: Luke Chen Fix For: 4.0.0 In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender dependency, and "Add {{compileOnly}} dependency from {{tools}} to {{log4j}} (same approach as {{{}core{}}})." . However, we need this runtime dependency in tools module to output logs. Adding this dependency back. Current output will be like this: {code:java} > bin/kafka-server-start.sh config/kraft/controller.properties SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16923) New Unit Test for stripDotPathComponents method
[ https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853543#comment-17853543 ] Arnav Dadarya commented on KAFKA-16923: --- I have submitted a pull request ([https://github.com/apache/kafka/pull/16259/files)] can someone approve and merge. > New Unit Test for stripDotPathComponents method > --- > > Key: KAFKA-16923 > URL: https://issues.apache.org/jira/browse/KAFKA-16923 > Project: Kafka > Issue Type: Test >Reporter: Arnav Dadarya >Priority: Minor > Original Estimate: 0h > Remaining Estimate: 0h > > I have written 1 new unit test > Created new test: *testStripDotPathComponents* > * Tests the stripDotPathComponents found in CommandUtils > * Ensures that it works as expected > [https://github.com/apache/kafka/pull/16259/files] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method
[ https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnav Dadarya updated KAFKA-16923: -- Description: I have written 1 new unit test and modified another unit test. Created new test: *testStripDotPathComponents* * Tests the stripDotPathComponents found in CommandUtils * Ensures that it works as expected Modified test: * Created a separate string variable to avoid hardcoding the values for the test [https://github.com/apache/kafka/pull/16259/files] was: I have written 1 new unit test and modified another unit test. Created new test: *testStripDotPathComponents* * Tests the stripDotPathComponents found in CommandUtils * Ensures that it works as expected Modified test: * Created a separate string variable to avoid hardcoding the values for the test [https://github.com/apache/kafka/pull/16259] > New Unit Test for stripDotPathComponents method > --- > > Key: KAFKA-16923 > URL: https://issues.apache.org/jira/browse/KAFKA-16923 > Project: Kafka > Issue Type: Test >Reporter: Arnav Dadarya >Priority: Minor > Original Estimate: 0h > Remaining Estimate: 0h > > I have written 1 new unit test and modified another unit test. > Created new test: *testStripDotPathComponents* > * Tests the stripDotPathComponents found in CommandUtils > * Ensures that it works as expected > Modified test: > * Created a separate string variable to avoid hardcoding the values for the > test > > [https://github.com/apache/kafka/pull/16259/files] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method
[ https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnav Dadarya updated KAFKA-16923: -- Description: I have written 1 new unit test Created new test: *testStripDotPathComponents* * Tests the stripDotPathComponents found in CommandUtils * Ensures that it works as expected [https://github.com/apache/kafka/pull/16259/files] was: I have written 1 new unit test and modified another unit test. Created new test: *testStripDotPathComponents* * Tests the stripDotPathComponents found in CommandUtils * Ensures that it works as expected Modified test: * Created a separate string variable to avoid hardcoding the values for the test [https://github.com/apache/kafka/pull/16259/files] > New Unit Test for stripDotPathComponents method > --- > > Key: KAFKA-16923 > URL: https://issues.apache.org/jira/browse/KAFKA-16923 > Project: Kafka > Issue Type: Test >Reporter: Arnav Dadarya >Priority: Minor > Original Estimate: 0h > Remaining Estimate: 0h > > I have written 1 new unit test > Created new test: *testStripDotPathComponents* > * Tests the stripDotPathComponents found in CommandUtils > * Ensures that it works as expected > [https://github.com/apache/kafka/pull/16259/files] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-16923) New Unit Test for stripDotPathComponents method
[ https://issues.apache.org/jira/browse/KAFKA-16923 ] Arnav Dadarya deleted comment on KAFKA-16923: --- was (Author: JIRAUSER303990): https://github.com/apache/kafka/pull/16258 > New Unit Test for stripDotPathComponents method > --- > > Key: KAFKA-16923 > URL: https://issues.apache.org/jira/browse/KAFKA-16923 > Project: Kafka > Issue Type: Test >Reporter: Arnav Dadarya >Priority: Minor > Original Estimate: 0h > Remaining Estimate: 0h > > I have written 1 new unit test and modified another unit test. > Created new test: *testStripDotPathComponents* > * Tests the stripDotPathComponents found in CommandUtils > * Ensures that it works as expected > Modified test: > * Created a separate string variable to avoid hardcoding the values for the > test > > [https://github.com/apache/kafka/pull/16259] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method
[ https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnav Dadarya updated KAFKA-16923: -- Description: I have written 1 new unit test and modified another unit test. Created new test: *testStripDotPathComponents* * Tests the stripDotPathComponents found in CommandUtils * Ensures that it works as expected Modified test: * Created a separate string variable to avoid hardcoding the values for the test [https://github.com/apache/kafka/pull/16259] was: I have written 1 new unit test and modified another unit test. Created new test: *testStripDotPathComponents* * Tests the stripDotPathComponents found in CommandUtils * Ensures that it works as expected Modified test: * Created a separate string variable to avoid hardcoding the values for the test https://github.com/apache/kafka/pull/16258 > New Unit Test for stripDotPathComponents method > --- > > Key: KAFKA-16923 > URL: https://issues.apache.org/jira/browse/KAFKA-16923 > Project: Kafka > Issue Type: Test >Reporter: Arnav Dadarya >Priority: Minor > Original Estimate: 0h > Remaining Estimate: 0h > > I have written 1 new unit test and modified another unit test. > Created new test: *testStripDotPathComponents* > * Tests the stripDotPathComponents found in CommandUtils > * Ensures that it works as expected > Modified test: > * Created a separate string variable to avoid hardcoding the values for the > test > > [https://github.com/apache/kafka/pull/16259] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16923) New Unit Test for stripDotPathComponents method
[ https://issues.apache.org/jira/browse/KAFKA-16923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnav Dadarya updated KAFKA-16923: -- External issue URL: https://github.com/apache/kafka/pull/16259 (was: https://github.com/apache/kafka/pull/16258) > New Unit Test for stripDotPathComponents method > --- > > Key: KAFKA-16923 > URL: https://issues.apache.org/jira/browse/KAFKA-16923 > Project: Kafka > Issue Type: Test >Reporter: Arnav Dadarya >Priority: Minor > Original Estimate: 0h > Remaining Estimate: 0h > > I have written 1 new unit test and modified another unit test. > Created new test: *testStripDotPathComponents* > * Tests the stripDotPathComponents found in CommandUtils > * Ensures that it works as expected > Modified test: > * Created a separate string variable to avoid hardcoding the values for the > test > > [https://github.com/apache/kafka/pull/16259] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Unit Test for stripDotPathComponents method in shell [kafka]
ardada2468 commented on PR #16259: URL: https://github.com/apache/kafka/pull/16259#issuecomment-2156909921 only shell/src/test/java/org/apache/kafka/shell/command/CommandUtilsTest.java file has been changed to incorporate 1 more unit test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Unit Test for stripDotPathComponents method in shell [kafka]
ardada2468 opened a new pull request, #16259: URL: https://github.com/apache/kafka/pull/16259 I have written 1 new unit test and modified another unit test. Created new test: **testStripDotPathComponents** - Tests the stripDotPathComponents found in CommandUtils - Ensures that the method works as expected (in case of future changes to the method) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Created a new unit test for stripDotPathComponents method and updated… [kafka]
ardada2468 closed pull request #16258: Created a new unit test for stripDotPathComponents method and updated… URL: https://github.com/apache/kafka/pull/16258 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16923) New Unit Test for stripDotPathComponents method
Arnav Dadarya created KAFKA-16923: - Summary: New Unit Test for stripDotPathComponents method Key: KAFKA-16923 URL: https://issues.apache.org/jira/browse/KAFKA-16923 Project: Kafka Issue Type: Test Reporter: Arnav Dadarya I have written 1 new unit test and modified another unit test. Created new test: *testStripDotPathComponents* * Tests the stripDotPathComponents found in CommandUtils * Ensures that it works as expected Modified test: * Created a separate string variable to avoid hardcoding the values for the test https://github.com/apache/kafka/pull/16258 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Created a new unit test for stripDotPathComponents method and updated… [kafka]
ardada2468 opened a new pull request, #16258: URL: https://github.com/apache/kafka/pull/16258 I have written 1 new unit test and modified another unit test. Created new test: **testStripDotPathComponents** - Tests the stripDotPathComponents found in CommandUtils - Ensures that it works as expected Modified test: - Created a separate string variable to avoid hardcoding the values for the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
chia7712 commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632396978 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { Review Comment: > I was planning to move the rest of KafkaConfig to it and make this class an abstract in the next PR and KafkaConfig.scala will extend it for now until I move everything out not sure whether I catch your point. it seems we will have a java version KafkaConfig but it is smaller? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
OmniaGM commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632391480 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { Review Comment: I was planning to move the rest of KafkaConfig to it and make this class an abstract in the next PR and KafkaConfig.scala will extend it for now until I move everything out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16855) KRaft - Wire replaying a TopicRecord
[ https://issues.apache.org/jira/browse/KAFKA-16855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853535#comment-17853535 ] Muralidhar Basani edited comment on KAFKA-16855 at 6/9/24 8:30 PM: --- [~christo_lolov] I tried to work on this. Basically I see the changes in 2 parts. * Adding of new fields RemoteLogSegmentMetadata & TopicRecord * Wiring them with new thread pools after ticket [16853|https://issues.apache.org/jira/browse/KAFKA-16853] is done I have created first part PR for this. What do you think ? was (Author: muralibasani): [~christo_lolov] I tried to work on this. Basically I see the changes in 2 parts. * Adding of new fields RemoteLogSegmentMetadata & TopicRecord * Wiring them with new thread pools after ticket 16853 is done I have created first part PR for this. What do you think ? > KRaft - Wire replaying a TopicRecord > > > Key: KAFKA-16855 > URL: https://issues.apache.org/jira/browse/KAFKA-16855 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Priority: Major > > *Summary* > Replaying a TopicRecord containing a new TieredEpoch and TieredState needs to > interact with the two thread pools in the RemoteLogManager to add/remove the > correct tasks from each -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16855) KRaft - Wire replaying a TopicRecord
[ https://issues.apache.org/jira/browse/KAFKA-16855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853535#comment-17853535 ] Muralidhar Basani commented on KAFKA-16855: --- [~christo_lolov] I tried to work on this. Basically I see the changes in 2 parts. * Adding of new fields RemoteLogSegmentMetadata & TopicRecord * Wiring them with new thread pools after ticket 16853 is done I have created first part PR for this. What do you think ? > KRaft - Wire replaying a TopicRecord > > > Key: KAFKA-16855 > URL: https://issues.apache.org/jira/browse/KAFKA-16855 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Priority: Major > > *Summary* > Replaying a TopicRecord containing a new TieredEpoch and TieredState needs to > interact with the two thread pools in the RemoteLogManager to add/remove the > correct tasks from each -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16855 : Part 1 - New fields tieredEpoch and tieredState [kafka]
muralibasani opened a new pull request, #16257: URL: https://github.com/apache/kafka/pull/16257 Resolves : https://issues.apache.org/jira/browse/KAFKA-16855 - Add field tieredEpoch to RemoteLogSegmentMetadata - Update relevant tests - Add two fields tieredEpoch and tieredState to TopicRecord.json ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [X] Verify test coverage and CI build status - [X] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
chia7712 commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1632376721 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) { controllerNode.metadataDirectory()); } props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); Review Comment: that is good. If you feel this PR needs some helpers, we can have another PR to address that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]
jeremyvdw commented on PR #15615: URL: https://github.com/apache/kafka/pull/15615#issuecomment-2156746613 @mjsax According to the [release plan](https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0) code freeze nis on 12th June: any chance this one and https://github.com/apache/kafka/pull/15607 will be merged for 3.8.0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]
muralibasani commented on code in PR #16255: URL: https://github.com/apache/kafka/pull/16255#discussion_r1632371896 ## clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS; +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +public class NewTopicTest { + +public static final String TEST_TOPIC = "testtopic"; +public static final int NUM_PARTITIONS = 3; +public static final int REPLICATION_FACTOR = 1; Review Comment: Ah yes, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]
muralibasani commented on code in PR #16255: URL: https://github.com/apache/kafka/pull/16255#discussion_r1632371830 ## clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS; +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +public class NewTopicTest { + +public static final String TEST_TOPIC = "testtopic"; +public static final int NUM_PARTITIONS = 3; +public static final int REPLICATION_FACTOR = 1; +public static final String CLEANUP_POLICY_CONFIG_KEY = "cleanup.policy"; +public static final String CLEANUP_POLICY_CONFIG_VALUE = "compact"; +public static final List BROKER_IDS = Arrays.asList(1, 2); + +@Test +public void testConstructorWithPartitionsAndReplicationFactor() { +NewTopic topic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) REPLICATION_FACTOR); +assertEquals(TEST_TOPIC, topic.name()); +assertEquals(NUM_PARTITIONS, topic.numPartitions()); +assertEquals(REPLICATION_FACTOR, topic.replicationFactor()); +assertNull(topic.replicasAssignments()); +} + +@Test +public void testConstructorWithOptionalValues() { +Optional numPartitions = Optional.empty(); +Optional replicationFactor = Optional.empty(); +NewTopic topic = new NewTopic(TEST_TOPIC, numPartitions, replicationFactor); +assertEquals(TEST_TOPIC, topic.name()); +assertEquals(NO_NUM_PARTITIONS, topic.numPartitions()); +assertEquals(NO_REPLICATION_FACTOR, topic.replicationFactor()); +assertNull(topic.replicasAssignments()); +} + +@Test +public void testConstructorWithReplicasAssignments() { +Map> replicasAssignments = new HashMap<>(); +replicasAssignments.put(0, BROKER_IDS); +NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments); +assertEquals(TEST_TOPIC, newTopic.name()); +assertEquals(NO_NUM_PARTITIONS, newTopic.numPartitions()); +assertEquals(NO_REPLICATION_FACTOR, newTopic.replicationFactor()); +assertEquals(replicasAssignments, newTopic.replicasAssignments()); +} + +@Test +public void testConfigs() { +NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) REPLICATION_FACTOR); +Map configs = new HashMap<>(); Review Comment: Indeed, added one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]
muralibasani commented on code in PR #16255: URL: https://github.com/apache/kafka/pull/16255#discussion_r1632371782 ## clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS; +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +public class NewTopicTest { + +public static final String TEST_TOPIC = "testtopic"; +public static final int NUM_PARTITIONS = 3; +public static final int REPLICATION_FACTOR = 1; +public static final String CLEANUP_POLICY_CONFIG_KEY = "cleanup.policy"; +public static final String CLEANUP_POLICY_CONFIG_VALUE = "compact"; +public static final List BROKER_IDS = Arrays.asList(1, 2); + +@Test +public void testConstructorWithPartitionsAndReplicationFactor() { +NewTopic topic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) REPLICATION_FACTOR); +assertEquals(TEST_TOPIC, topic.name()); +assertEquals(NUM_PARTITIONS, topic.numPartitions()); +assertEquals(REPLICATION_FACTOR, topic.replicationFactor()); +assertNull(topic.replicasAssignments()); +} + +@Test +public void testConstructorWithOptionalValues() { +Optional numPartitions = Optional.empty(); +Optional replicationFactor = Optional.empty(); +NewTopic topic = new NewTopic(TEST_TOPIC, numPartitions, replicationFactor); +assertEquals(TEST_TOPIC, topic.name()); +assertEquals(NO_NUM_PARTITIONS, topic.numPartitions()); +assertEquals(NO_REPLICATION_FACTOR, topic.replicationFactor()); +assertNull(topic.replicasAssignments()); +} + +@Test +public void testConstructorWithReplicasAssignments() { +Map> replicasAssignments = new HashMap<>(); +replicasAssignments.put(0, BROKER_IDS); +NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments); +assertEquals(TEST_TOPIC, newTopic.name()); +assertEquals(NO_NUM_PARTITIONS, newTopic.numPartitions()); +assertEquals(NO_REPLICATION_FACTOR, newTopic.replicationFactor()); +assertEquals(replicasAssignments, newTopic.replicasAssignments()); +} + +@Test +public void testConfigs() { +NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) REPLICATION_FACTOR); +Map configs = new HashMap<>(); +configs.put(CLEANUP_POLICY_CONFIG_KEY, CLEANUP_POLICY_CONFIG_VALUE); +newTopic.configs(configs); +assertEquals(configs, newTopic.configs()); +} + +@Test +public void testUnmodifiableReplicasAssignments() { +Map> replicasAssignments = new HashMap<>(); +replicasAssignments.put(0, BROKER_IDS); +NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments); +Map> returnedAssignments = newTopic.replicasAssignments(); + +assertThrows(UnsupportedOperationException.class, () -> { +returnedAssignments.put(1, Arrays.asList(3, 4)); +}); +} + +@Test +public void testConvertToCreatableTopic() { +int partitionIndex = 0; +Map> replicasAssignments = new HashMap<>(); +replicasAssignments.put(partitionIndex, BROKER_IDS); +NewTopic topic = new NewTopic(TEST_TOPIC, replicasAssignments); +Map configs = new HashMap<>(); +configs.put(CLEANUP_POLICY_CONFIG_KEY, CLEANUP_POLICY_CONFIG_VALUE); +topic.configs(configs); + +CreateTopicsRequestData.CreatableTopic
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
TaiJuWu commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1632369365 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) { controllerNode.metadataDirectory()); } props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); Review Comment: In this PR, I try to minimal this patch so I don't touch any helper function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16890: Compute valid log-start-offset when deleting overlapping remote segments [kafka]
dopuskh3 commented on code in PR #16237: URL: https://github.com/apache/kafka/pull/16237#discussion_r1632367056 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2038,6 +2038,69 @@ public void testDeletionOnRetentionBreachedSegments(long retentionSize, assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count()); } +@ParameterizedTest(name = "testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} retentionMs={1}") +@CsvSource(value = {"0, -1", "-1, 0"}) +public void testDeletionOnOverlappingRetentionBreachedSegments(long retentionSize, + long retentionMs) +throws RemoteStorageException, ExecutionException, InterruptedException { +Map logProps = new HashMap<>(); +logProps.put("retention.bytes", retentionSize); +logProps.put("retention.ms", retentionMs); +LogConfig mockLogConfig = new LogConfig(logProps); +when(mockLog.config()).thenReturn(mockLogConfig); + +List epochEntries = Collections.singletonList(epochEntry0); +checkpoint.write(epochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); Review Comment: `scheduler` is not defined here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16890: Compute valid log-start-offset when deleting overlapping remote segments [kafka]
dopuskh3 commented on code in PR #16237: URL: https://github.com/apache/kafka/pull/16237#discussion_r1632367056 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2038,6 +2038,69 @@ public void testDeletionOnRetentionBreachedSegments(long retentionSize, assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count()); } +@ParameterizedTest(name = "testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} retentionMs={1}") +@CsvSource(value = {"0, -1", "-1, 0"}) +public void testDeletionOnOverlappingRetentionBreachedSegments(long retentionSize, + long retentionMs) +throws RemoteStorageException, ExecutionException, InterruptedException { +Map logProps = new HashMap<>(); +logProps.put("retention.bytes", retentionSize); +logProps.put("retention.ms", retentionMs); +LogConfig mockLogConfig = new LogConfig(logProps); +when(mockLog.config()).thenReturn(mockLogConfig); + +List epochEntries = Collections.singletonList(epochEntry0); +checkpoint.write(epochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); Review Comment: `scheduler` is not defined here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]
chia7712 commented on code in PR #16255: URL: https://github.com/apache/kafka/pull/16255#discussion_r1632366297 ## clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS; +import static org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +public class NewTopicTest { + +public static final String TEST_TOPIC = "testtopic"; +public static final int NUM_PARTITIONS = 3; +public static final int REPLICATION_FACTOR = 1; +public static final String CLEANUP_POLICY_CONFIG_KEY = "cleanup.policy"; +public static final String CLEANUP_POLICY_CONFIG_VALUE = "compact"; +public static final List BROKER_IDS = Arrays.asList(1, 2); + +@Test +public void testConstructorWithPartitionsAndReplicationFactor() { +NewTopic topic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) REPLICATION_FACTOR); +assertEquals(TEST_TOPIC, topic.name()); +assertEquals(NUM_PARTITIONS, topic.numPartitions()); +assertEquals(REPLICATION_FACTOR, topic.replicationFactor()); +assertNull(topic.replicasAssignments()); +} + +@Test +public void testConstructorWithOptionalValues() { +Optional numPartitions = Optional.empty(); +Optional replicationFactor = Optional.empty(); +NewTopic topic = new NewTopic(TEST_TOPIC, numPartitions, replicationFactor); +assertEquals(TEST_TOPIC, topic.name()); +assertEquals(NO_NUM_PARTITIONS, topic.numPartitions()); +assertEquals(NO_REPLICATION_FACTOR, topic.replicationFactor()); +assertNull(topic.replicasAssignments()); +} + +@Test +public void testConstructorWithReplicasAssignments() { +Map> replicasAssignments = new HashMap<>(); +replicasAssignments.put(0, BROKER_IDS); +NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments); +assertEquals(TEST_TOPIC, newTopic.name()); +assertEquals(NO_NUM_PARTITIONS, newTopic.numPartitions()); +assertEquals(NO_REPLICATION_FACTOR, newTopic.replicationFactor()); +assertEquals(replicasAssignments, newTopic.replicasAssignments()); +} + +@Test +public void testConfigs() { +NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, (short) REPLICATION_FACTOR); +Map configs = new HashMap<>(); Review Comment: Could you verify that `newTopic.configs()` will return null if it has no configs ## clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import
Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]
chia7712 commented on code in PR #16244: URL: https://github.com/apache/kafka/pull/16244#discussion_r1632364409 ## storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.errors.InvalidOffsetException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.TreeMap; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OffsetIndexTest { + +private OffsetIndex index; +private static final long BASE_OFFSET = 45L; + +@BeforeEach +public void setup() throws IOException { +index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8); +} + +@AfterEach +public void tearDown() throws IOException { +if (index != null) { +Files.deleteIfExists(index.file().toPath()); +} +this.index.close(); Review Comment: you should call close before deleting the index file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16885. Fix Version/s: 3.9.0 Resolution: Fixed > Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to > RemoteLogManagerConfig#isRemoteStorageSystemEnabled > - > > Key: KAFKA-16885 > URL: https://issues.apache.org/jira/browse/KAFKA-16885 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia Chuan Yu >Priority: Major > Fix For: 3.9.0 > > > see the discussion: > https://github.com/apache/kafka/pull/16153#issuecomment-2144269279 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled [kafka]
chia7712 merged PR #16256: URL: https://github.com/apache/kafka/pull/16256 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]
chia7712 commented on code in PR #16250: URL: https://github.com/apache/kafka/pull/16250#discussion_r1632361233 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2760,9 +2760,12 @@ void handleFailure(Throwable throwable) { }, now); } -return new DescribeConfigsResult(new HashMap<>(nodeFutures.entrySet().stream() -.flatMap(x -> x.getValue().entrySet().stream()) -.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue; +Map> resourceToConfigFuture = nodeFutures.entrySet() Review Comment: why we need this temporary variable? the following style works on my local. ```java return new DescribeConfigsResult(nodeFutures.entrySet() .stream() .flatMap(x -> x.getValue().entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module (part 1) [kafka]
chia7712 commented on PR #16253: URL: https://github.com/apache/kafka/pull/16253#issuecomment-2156719721 @m1a2st Could you please split this PR into small PRs (similar to #16253)? It is hard to complete the migration (and review) at once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
chia7712 commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632360489 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { +@SuppressWarnings("deprecation") +public final static ConfigDef CONFIG_DEF = new ConfigDef() Review Comment: Could it include `RemoteLogManagerConfig.configDef()`? ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression;
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
chia7712 commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632358879 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { Review Comment: you are right. my previous idea is over-engineering :_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]
chia7712 commented on code in PR #16231: URL: https://github.com/apache/kafka/pull/16231#discussion_r1632358803 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java: ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Threads(2) +public class ConcurrentMapBenchmark { +private static final int TIMES = 1000_000; + +@Param({"100"}) +private int mapSize; + +@Param({"0.1"}) +private double writePercentage; + +private Map concurrentHashMap; +private Map copyOnWriteMap; +private int writePerLoops; + +@Setup +public void setup() { +Map mapTemplate = IntStream.range(0, mapSize).boxed() +.collect(Collectors.toMap(i -> i, i -> i)); +concurrentHashMap = new ConcurrentHashMap<>(mapTemplate); +copyOnWriteMap = new CopyOnWriteMap<>(mapTemplate); +writePerLoops = TIMES / (int) Math.round(writePercentage * TIMES); +} + +@Benchmark +@OperationsPerInvocation(TIMES) +public void testConcurrentHashMapGet(Blackhole blackhole) { +for (int i = 0; i < TIMES; i++) { +if (i % writePerLoops == 0) { +// add offset mapSize to ensure computeIfAbsent do add new entry +concurrentHashMap.computeIfAbsent(i + mapSize, key -> key); +} else { +blackhole.consume(concurrentHashMap.get(i % mapSize)); +} +} +} + +@Benchmark +@OperationsPerInvocation(TIMES) +public void testConcurrentHashMapGetRandom(Blackhole blackhole) { +for (int i = 0; i < TIMES; i++) { +if (i % writePerLoops == 0) { +// add offset mapSize to ensure computeIfAbsent do add new entry +concurrentHashMap.computeIfAbsent(i + mapSize, key -> key); +} else { + blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, mapSize + 1))); +} +} +} + +@Benchmark +@OperationsPerInvocation(TIMES) +public void testCopyOnWriteMapGet(Blackhole blackhole) { +for (int i = 0; i < TIMES; i++) { +if (i % writePerLoops == 0) { +// add offset mapSize to ensure computeIfAbsent do add new entry +copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key); +} else { +blackhole.consume(copyOnWriteMap.get(i % mapSize)); +} +} +} + +@Benchmark +@OperationsPerInvocation(TIMES) +public void testCopyOnWriteMapGetRandom(Blackhole blackhole) { +for (int i = 0; i < TIMES; i++) { +if (i % writePerLoops == 0) { +// add offset mapSize to ensure computeIfAbsent do add new entry +copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key); +} else { + blackhole.consume(copyOnWriteMap.get(ThreadLocalRandom.current().nextInt(0, mapSize + 1))); +} +} +} + +@Benchmark +@OperationsPerInvocation(TIMES) +
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
chia7712 commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1632358372 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -353,6 +354,9 @@ static private void setupNodeDirectories(File baseDirectory, private final TestKitNodes nodes; private final Map controllers; private final Map brokers; + +private final Map aliveBrokers; Review Comment: > if a broker already shutdown, how can we access the server and check its status? both zk and kraft broker has in-memory state, so maybe you can leverage them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
chia7712 commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1632358211 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) { controllerNode.metadataDirectory()); } props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); Review Comment: not sure why we need to call that test helper? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632355536 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { Review Comment: could you please rename it to `testIterator`? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +assertEquals(currentPartition, record.partition()); +assertEquals(recordCount % recordSize, record.offset()); +assertEquals(recordCount % recordSize, record.key()); +assertEquals(String.valueOf(recordCount % recordSize), record.value()); + +recordCount++; } -assertEquals(2, c); + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecords() { Review Comment: Could you add test for `records(TopicPartition)`? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex,
Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]
dopuskh3 commented on PR #16210: URL: https://github.com/apache/kafka/pull/16210#issuecomment-2156701940 @kamalcph updated test case. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16919) Flaky test testNoCheckpointsIfNoRecordsAreMirrored() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
[ https://issues.apache.org/jira/browse/KAFKA-16919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853511#comment-17853511 ] Muralidhar Basani commented on KAFKA-16919: --- [~soarez] I suspect there could be resource constraints for back up cluster, after the primary cluster is started. We can try increasing the timeout for starting backup workers or we can start both the clusters in parallel. > Flaky test testNoCheckpointsIfNoRecordsAreMirrored() – > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest > -- > > Key: KAFKA-16919 > URL: https://issues.apache.org/jira/browse/KAFKA-16919 > Project: Kafka > Issue Type: Test >Reporter: Igor Soarez >Priority: Minor > > Source > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16251/1/tests] > > {code:java} > java.lang.AssertionError: Workers of backup-connect-cluster did not start in > time. > at > org.apache.kafka.connect.util.clusters.ConnectAssertions.assertAtLeastNumWorkersAreUp(ConnectAssertions.java:74) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:230) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:150) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:64) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]
dopuskh3 commented on PR #16210: URL: https://github.com/apache/kafka/pull/16210#issuecomment-2156696910 > @dopuskh3 > > Any updates on this? Should be able to give it a try tomorrow. Pulling the fix on our internal branch right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15166) Add deletePartition API to the RemoteStorageManager
[ https://issues.apache.org/jira/browse/KAFKA-15166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15166: - Fix Version/s: (was: 3.8.0) > Add deletePartition API to the RemoteStorageManager > --- > > Key: KAFKA-15166 > URL: https://issues.apache.org/jira/browse/KAFKA-15166 > Project: Kafka > Issue Type: Improvement >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Labels: kip > > Remote Storage Manager exposes {{deleteLogSegmentData}} API to delete the > individual log segments. Storage providers such as HDFS have support to > delete a directory. Having an {{deletePartition}} API to delete the data at > the partition level will enhance the topic deletion. > This task may require a KIP as it touches the user-facing APIs. > > Please also remember to remove the comment on the test here: > https://github.com/apache/kafka/pull/13837#discussion_r1247676834 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer
[ https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash resolved KAFKA-15777. -- Resolution: Won't Fix > Configurable remote fetch bytes per partition from Consumer > --- > > Key: KAFKA-15777 > URL: https://issues.apache.org/jira/browse/KAFKA-15777 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Labels: kip > > A consumer can configure the amount of local bytes to read from each > partition in the FETCH request. > {{max.fetch.bytes}} = 50 MB > {{max.partition.fetch.bytes}} = 1 MB > Similar to this, the consumer should be able to configure > {{max.remote.partition.fetch.bytes}} = 4 MB. > While handling the {{FETCH}} request, if we encounter a partition to read > data from remote storage, then rest of the partitions in the request are > ignored. Essentially, we are serving only 1 MB of remote data per FETCH > request when all the partitions in the request are to be served from the > remote storage. > Providing one more configuration to the client help the user to tune the > values depending on their storage plugin. The user might want to optimise the > number of calls to remote storage vs amount of bytes returned back to the > client in the FETCH response. > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
OmniaGM commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { Review Comment: Sorry for the late response. It would be nice separation however I am not sure it would be useful as these definition rarely will be re-used with the same structure and definition outside of their classes. For example `RemoteLogManagerConfig.CONFIG_DEF` will be only used in `RemoteLogManagerConfig` and not needed outside of this context. The `ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. Also we have some configs that common between server and other components of kafka and having multiple definition for different context for the same configs might be confusing going forward. We also might create confusion regarding where to define these config definitions inside `Config/Configs` class or within `KafkaConfigDef`. For example we can move most config definition for things like `QUOTA`, `GROUP_COORDINATOR`, `REPLICATION` etc to their `Configs` classes however things like `SSL`, `SASL` get mostly populated from public classes and `LOG` while most of the `CONFIG` fields are defined in private class `ServerLogConfigs`, the defaults are defined in storage package and we can't depend on the storage as it will create circular dependency between `storage` and
Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]
m1a2st commented on code in PR #16244: URL: https://github.com/apache/kafka/pull/16244#discussion_r1632348365 ## storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java: ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.errors.InvalidOffsetException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.TreeMap; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OffsetIndexTest { + +private OffsetIndex index; +private static final long BASE_OFFSET = 45L; + +@BeforeEach +public void setup() throws IOException { +index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8); +} + +@AfterEach +public void tearDown() throws IOException { +if (index != null) { +Files.deleteIfExists(index.file().toPath()); Review Comment: @TaiJuWu, It is a good idea. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]
chia7712 commented on PR #16158: URL: https://github.com/apache/kafka/pull/16158#issuecomment-2156662574 I prefer to check all failed tests before merging. And the last commit of this PR does not have completed CI. That is why I suggest to rebase code to trigger QA again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]
brandboat commented on code in PR #16231: URL: https://github.com/apache/kafka/pull/16231#discussion_r1632327595 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Threads(2) +public class ConcurrentMapBenchmark { +@Param({"100"}) +private int times; + +@Param({"100"}) +private int mapSize; + +// execute 1 computeIfAbsent per 1 loops +@Param({"1"}) +private int writePerLoops; + +private Map concurrentHashMap; +private Map copyOnWriteMap; + +@Setup +public void setup() { +Map mapTemplate = IntStream.range(0, mapSize).boxed() +.collect(Collectors.toMap(i -> i, i -> i)); +concurrentHashMap = new ConcurrentHashMap<>(mapTemplate); +copyOnWriteMap = new CopyOnWriteMap<>(mapTemplate); +} + +@Benchmark +public void testConcurrentHashMap(Blackhole blackhole) { Review Comment: thanks ! now the benchmark result are as belows. ``` # OS: openSUSE Tumbleweed # JMH version: 1.37 # VM version: JDK 21.0.3, OpenJDK 64-Bit Server VM, 21.0.3+9-LTS (zulu) # CPU: 13th Gen Intel(R) Core(TM) i9-13980HX Benchmark (mapSize) (writePercentage) Mode Cnt Score Error Units ConcurrentMapBenchmark.testConcurrentHashMapEntrySet 100 0.1 avgt 15 2.368 ± 0.066 ns/op ConcurrentMapBenchmark.testConcurrentHashMapGet 100 0.1 avgt 15 4.385 ± 0.013 ns/op ConcurrentMapBenchmark.testConcurrentHashMapGetRandom100 0.1 avgt 15 8.895 ± 0.019 ns/op ConcurrentMapBenchmark.testConcurrentHashMapValues 100 0.1 avgt 15 2.289 ± 0.052 ns/op ConcurrentMapBenchmark.testCopyOnWriteMapEntrySet100 0.1 avgt 15 2.442 ± 0.072 ns/op ConcurrentMapBenchmark.testCopyOnWriteMapGet 100 0.1 avgt 15 4.664 ± 0.014 ns/op ConcurrentMapBenchmark.testCopyOnWriteMapGetRandom 100 0.1 avgt 15 6.243 ± 0.033 ns/op ConcurrentMapBenchmark.testCopyOnWriteMapValues 100 0.1 avgt 15 2.435 ± 0.174 ns/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]
chia7712 commented on PR #16158: URL: https://github.com/apache/kafka/pull/16158#issuecomment-2156661854 @riedelmax #16249 fix the blocked tests. Without that fix, the CI will get timeout when running your PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
OmniaGM commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { Review Comment: Sorry for the late response. It would be nice separation however I am not sure it would be useful as these definition rarely will be re-used with the same structure and definition outside of their classes. For example `RemoteLogManagerConfig.CONFIG_DEF` will be only used in `RemoteLogManagerConfig` and not needed outside of this context. The `ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. Also we have some configs that common between server and other components of kafka and having multiple definition for different context for the same configs might be confusing going forward. We also might create confusion of when to defend config definition within the `Config` class and when to create it within `KafkaConfigDef` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]
kamalcph commented on code in PR #16078: URL: https://github.com/apache/kafka/pull/16078#discussion_r1632316732 ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -817,6 +817,119 @@ class DynamicBrokerConfigTest { Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) } + @Test + def testRemoteLogManagerCopyQuotaUpdates(): Unit = { +val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP + +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) +val config = KafkaConfig.fromProps(props) +val serverMock: KafkaServer = mock(classOf[KafkaServer]) +val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager])) + +Mockito.when(serverMock.config).thenReturn(config) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + +config.dynamicConfig.initialize(None, None) +config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) + + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.getLong(copyQuotaProp)) + +// Update default config +props.put(copyQuotaProp, "100") +config.dynamicConfig.updateDefaultConfig(props) +assertEquals(100, config.getLong(copyQuotaProp)) +Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100) + +// Update per broker config +props.put(copyQuotaProp, "200") +config.dynamicConfig.updateBrokerConfig(0, props) +assertEquals(200, config.getLong(copyQuotaProp)) +Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200) + +Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + } + + @Test + def testRemoteLogManagerFetchQuotaUpdates(): Unit = { Review Comment: this test is similar to `testRemoteLogManagerCopyQuotaUpdates`. can we refactor the test to extract the method? Or, use parameterized test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]
kamalcph commented on code in PR #16078: URL: https://github.com/apache/kafka/pull/16078#discussion_r1632316363 ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -817,6 +817,119 @@ class DynamicBrokerConfigTest { Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) } + @Test + def testRemoteLogManagerCopyQuotaUpdates(): Unit = { +val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP + +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) +val config = KafkaConfig.fromProps(props) +val serverMock: KafkaServer = mock(classOf[KafkaServer]) +val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager])) + +Mockito.when(serverMock.config).thenReturn(config) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + +config.dynamicConfig.initialize(None, None) +config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) + + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.getLong(copyQuotaProp)) + +// Update default config +props.put(copyQuotaProp, "100") +config.dynamicConfig.updateDefaultConfig(props) +assertEquals(100, config.getLong(copyQuotaProp)) Review Comment: should we move the `getter` method from RemoteLogManagerConfig to KafkaConfig class? 1. remoteLogManagerCopyMaxBytesPerSecond 2. remoteLogManagerFetchMaxBytesPerSecond and 3. remoteLogIndexFileCacheTotalSizeBytes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]
kamalcph commented on PR #16210: URL: https://github.com/apache/kafka/pull/16210#issuecomment-2156632862 @dopuskh3 Any updates on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16401) Client requests and consumer-pref test failed
[ https://issues.apache.org/jira/browse/KAFKA-16401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gendong1 updated KAFKA-16401: - Description: The cluster consists of 3 nodes. When the storeOffsets is delay due to fail-slow disk, the corresponding thread is blocked and holds a lock.The retry requests consume the IO thread and network threads set in server.proterty. However, the retry requests is blocked since they cannot acquire the lock. The coming client requests and cosumer-pref tests failed. Are there any comments to figure out this issues? I will very appreciate them. was:The cluster consists of 3 nodes. When the storeOffsets is delay due to fail-slow disk, the corresponding thread is blocked and holds a lock.The retry requests consume the IO thread and network threads set in server.proterty. However, the retry requests is blocked since they cannot acquire the lock. The coming client requests and cosumer-pref tests failed. > Client requests and consumer-pref test failed > - > > Key: KAFKA-16401 > URL: https://issues.apache.org/jira/browse/KAFKA-16401 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.2 >Reporter: gendong1 >Priority: Major > > The cluster consists of 3 nodes. When the storeOffsets is delay due to > fail-slow disk, the corresponding thread is blocked and holds a lock.The > retry requests consume the IO thread and network threads set in > server.proterty. However, the retry requests is blocked since they cannot > acquire the lock. The coming client requests and cosumer-pref tests failed. > Are there any comments to figure out this issues? I will very appreciate them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled [kafka]
chiacyu opened a new pull request, #16256: URL: https://github.com/apache/kafka/pull/16256 This PR is based on [KAFKA-16885](https://issues.apache.org/jira/browse/KAFKA-16885). See the [discussion](https://github.com/apache/kafka/pull/16153#issuecomment-2144269279) for further details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]
riedelmax commented on PR #16158: URL: https://github.com/apache/kafka/pull/16158#issuecomment-2156601592 @chia7712 would you mind to explain how #16249 effects this PR? if there are no conflicts with trunk i dont need to rebase or merge manually right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853493#comment-17853493 ] Chia Chuan Yu commented on KAFKA-16885: --- Hi, [~ckamal] Yes, I'm working on this one. > Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to > RemoteLogManagerConfig#isRemoteStorageSystemEnabled > - > > Key: KAFKA-16885 > URL: https://issues.apache.org/jira/browse/KAFKA-16885 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia Chuan Yu >Priority: Major > > see the discussion: > https://github.com/apache/kafka/pull/16153#issuecomment-2144269279 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16885) Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to RemoteLogManagerConfig#isRemoteStorageSystemEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853492#comment-17853492 ] Kamal Chandraprakash commented on KAFKA-16885: -- [~chiacyu] Are you working on this? > Consider renaming RemoteLogManagerConfig#enableRemoteStorageSystem to > RemoteLogManagerConfig#isRemoteStorageSystemEnabled > - > > Key: KAFKA-16885 > URL: https://issues.apache.org/jira/browse/KAFKA-16885 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia Chuan Yu >Priority: Major > > see the discussion: > https://github.com/apache/kafka/pull/16153#issuecomment-2144269279 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
OmniaGM commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { Review Comment: Sorry for the late response. It would be nice separation however I am not sure it would be useful as these definition rarely will be re-used with the same structure and definition outside of their classes. For example `RemoteLogManagerConfig.CONFIG_DEF` will be only used in `RemoteLogManagerConfig` and not needed outside of this context. The `ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. Also we have some configs that common between server and other components of kafka and having multiple definition for different context for the same configs might be confusing going forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move configDef out of core [kafka]
OmniaGM commented on code in PR #16116: URL: https://github.com/apache/kafka/pull/16116#discussion_r1632251507 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.compress.Lz4Compression; +import org.apache.kafka.common.compress.ZstdCompression; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersionValidator; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LIST; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; +import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public class KafkaConfig { Review Comment: Sorry for the late respond. It would be nice separation however I am not sure it would be useful as these definition rarely will be re-used with the same structure and definition outside of their classes. For example `RemoteLogManagerConfig.CONFIG_DEF` will be only used in `RemoteLogManagerConfig` and not needed outside of this context. The `ZOOKEEPER` is only used in KafkaConfig and will not be used outside of this. Also we have some configs that common between server and other components of kafka and having multiple definition for different context for the same configs might be confusing going forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]
muralibasani commented on PR #16255: URL: https://github.com/apache/kafka/pull/16255#issuecomment-2156419270 @chia7712 can you pls take a look ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]
kamalcph commented on code in PR #16078: URL: https://github.com/apache/kafka/pull/16078#discussion_r1632220708 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -1165,43 +1165,66 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w override def validateReconfiguration(newConfig: KafkaConfig): Unit = { newConfig.values.forEach { (k, v) => - if (reconfigurableConfigs.contains(k)) { -if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) { - val newValue = v.asInstanceOf[Long] - val oldValue = getValue(server.config, k) - if (newValue != oldValue && newValue <= 0) { -val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v" -throw new ConfigException(s"$errorMsg, value should be at least 1") - } + if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) || Review Comment: can we do an inverse check since we removed the `contains` check? What if the `k` is null? ``` if (RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k)) || ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas [kafka]
kamalcph commented on code in PR #16078: URL: https://github.com/apache/kafka/pull/16078#discussion_r1632220708 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -1165,43 +1165,66 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w override def validateReconfiguration(newConfig: KafkaConfig): Unit = { newConfig.values.forEach { (k, v) => - if (reconfigurableConfigs.contains(k)) { -if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) { - val newValue = v.asInstanceOf[Long] - val oldValue = getValue(server.config, k) - if (newValue != oldValue && newValue <= 0) { -val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v" -throw new ConfigException(s"$errorMsg, value should be at least 1") - } + if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) || Review Comment: can we do an inverse check? What if the `k` is null? ``` if (RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k)) || ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]
muralibasani opened a new pull request, #16255: URL: https://github.com/apache/kafka/pull/16255 Resolves https://issues.apache.org/jira/browse/KAFKA-16922 Adding unit tests for org.apache.kafka.clients.admin.NewTopic ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [X] Verify test coverage and CI build status - [X] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); +// Verify the highest offset in remote storage was -1L before the copy started +assertEquals(-1L, capture.getValue()); +} else { +// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability +assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + +// Verify quota check was performed +verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); +// Verify bytes to copy was recorded with the quota manager +verify(rlmCopyQuotaManager, times(1)).record(10); + +// Verify the highest offset in remote storage is updated +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); +List capturedValues = capture.getAllValues(); +// Verify the highest offset in remote storage was -1L before the copy +assertEquals(-1L, capturedValues.get(0).longValue()); +// Verify it was updated to 149L after the copy +assertEquals(149L, capturedValues.get(1).longValue()); +} +} + +@Test +public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(true); + +Thread t = new Thread(task); +t.start(); +// Sleep for a while to allow the task to start and quota check to be performed once +Thread.sleep(100); Review Comment: `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not tested E2E. Can we rewrite the test? ``` @Test public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { remoteLogManager.startup(); setupRLMTask(true); remoteLogManager.onLeadershipChange( Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); TestUtils.waitForCondition(() -> { verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); return true; }, "Quota exceeded check did not happen"); assertTimeoutPreemptively(Duration.ofMillis(100), () -> remoteLogManager.close()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15623: Migrate streams tests (part 1) module to JUnit 5 [kafka]
FrankYang0529 opened a new pull request, #16254: URL: https://github.com/apache/kafka/pull/16254 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); +// Verify the highest offset in remote storage was -1L before the copy started +assertEquals(-1L, capture.getValue()); +} else { +// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability +assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + +// Verify quota check was performed +verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); +// Verify bytes to copy was recorded with the quota manager +verify(rlmCopyQuotaManager, times(1)).record(10); + +// Verify the highest offset in remote storage is updated +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); +List capturedValues = capture.getAllValues(); +// Verify the highest offset in remote storage was -1L before the copy +assertEquals(-1L, capturedValues.get(0).longValue()); +// Verify it was updated to 149L after the copy +assertEquals(149L, capturedValues.get(1).longValue()); +} +} + +@Test +public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(true); + +Thread t = new Thread(task); +t.start(); +// Sleep for a while to allow the task to start and quota check to be performed once +Thread.sleep(100); Review Comment: `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not tested E2E. Can we rewrite the test? Also, we have to give some buffer time in the close timeout (quotaTimeout + 50 ms) to avoid test flakiness. ``` @Test public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { remoteLogManager.startup(); setupRLMTask(true); remoteLogManager.onLeadershipChange( Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); TestUtils.waitForCondition(() -> { verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); return true; }, "Quota exceeded check did not happen"); assertTimeoutPreemptively(Duration.ofMillis(150), () -> remoteLogManager.close()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]
TaiJuWu commented on code in PR #16244: URL: https://github.com/apache/kafka/pull/16244#discussion_r1632215976 ## storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java: ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.errors.InvalidOffsetException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.TreeMap; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OffsetIndexTest { + +private OffsetIndex index; +private static final long BASE_OFFSET = 45L; + +@BeforeEach +public void setup() throws IOException { +index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8); +} + +@AfterEach +public void tearDown() throws IOException { +if (index != null) { +Files.deleteIfExists(index.file().toPath()); Review Comment: Maybe it can be added `this.index.close()` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]
TaiJuWu commented on code in PR #16244: URL: https://github.com/apache/kafka/pull/16244#discussion_r1632215976 ## storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java: ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.errors.InvalidOffsetException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.TreeMap; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OffsetIndexTest { + +private OffsetIndex index; +private static final long BASE_OFFSET = 45L; + +@BeforeEach +public void setup() throws IOException { +index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8); +} + +@AfterEach +public void tearDown() throws IOException { +if (index != null) { +Files.deleteIfExists(index.file().toPath()); Review Comment: Maybe it can add `this.index.close()` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16922) add unit test for NewTopic
[ https://issues.apache.org/jira/browse/KAFKA-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muralidhar Basani reassigned KAFKA-16922: - Assignee: Muralidhar Basani (was: Chia-Ping Tsai) > add unit test for NewTopic > -- > > Key: KAFKA-16922 > URL: https://issues.apache.org/jira/browse/KAFKA-16922 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Muralidhar Basani >Priority: Minor > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
TaiJuWu commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1632204289 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -353,6 +354,9 @@ static private void setupNodeDirectories(File baseDirectory, private final TestKitNodes nodes; private final Map controllers; private final Map brokers; + +private final Map aliveBrokers; Review Comment: Sounds great but if a broker already shutdown, how can we access the server and check its status? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
TaiJuWu commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1632203912 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) { controllerNode.metadataDirectory()); } props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); Review Comment: The TestUtils requires sever use `PLAINTEXT` protocol, you can reference [here.](https://github.com/apache/kafka/blob/d6cd83e2fb2bab4526f07e067277b34e482f6678/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L232) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] improve log description of QuorumController [kafka]
chickenchickenlove commented on code in PR #15926: URL: https://github.com/apache/kafka/pull/15926#discussion_r1632188911 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -165,6 +165,8 @@ static ControllerResult recordsForNonEmptyLog( throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was " + "created in KRaft mode."); } +logMessageBuilder +.append("This is expected because this is a de-novo KRaft cluster."); Review Comment: @mumrah , i make new commit to apply your comments. However, the `CI` still is failed. The changes which i made seems not to be related with failure of CI. Am i wrong? https://github.com/apache/kafka/assets/90125071/d013add9-814d-4ad9-8c2e-cb2955455645;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module [kafka]
chia7712 commented on code in PR #16253: URL: https://github.com/apache/kafka/pull/16253#discussion_r1632184801 ## build.gradle: ## @@ -3248,7 +3248,6 @@ project(':connect:runtime') { testImplementation project(':group-coordinator') testImplementation libs.junitJupiterApi -testImplementation libs.junitVintageEngine Review Comment: we should remove all usage of junit 4 code form connect module. If you get such error, please rewrite them by junit 5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module [kafka]
m1a2st commented on code in PR #16253: URL: https://github.com/apache/kafka/pull/16253#discussion_r1632184386 ## build.gradle: ## @@ -3248,7 +3248,6 @@ project(':connect:runtime') { testImplementation project(':group-coordinator') testImplementation libs.junitJupiterApi -testImplementation libs.junitVintageEngine Review Comment: There is a little problem, If I remove the `testImplementation libs.junitVintageEngine`, Gradle can't get `org.junit.` dependencies and some annotations are error. I find a way to resolve it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2156349665 Hi @chia7712 , I have do some refactors, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module [kafka]
m1a2st commented on code in PR #16253: URL: https://github.com/apache/kafka/pull/16253#discussion_r1632175364 ## build.gradle: ## @@ -540,9 +537,6 @@ subprojects { exclude testsToExclude -if (shouldUseJUnit5) - useJUnitPlatform() Review Comment: Sorry, I misunderstand the `shouldUseJUnit5` variable logic, so I rollback it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org