Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-21 Thread via GitHub


showuon commented on PR #15732:
URL: https://github.com/apache/kafka/pull/15732#issuecomment-2068407055

   @akhileshchg @mumrah @cmccabe , could you take a look when available. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1574049613


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +123,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. Override initialTaskDelayMs as 5 
seconds.
+// The first retention task is executed after 5 seconds, so waiting for 10 
seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 1)

Review Comment:
   nit: I think we can leave the timeout as default value. That is, removing 
the 3rd parameter directly.



##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -37,6 +40,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest 
{
 
   val topic = "topic"
 
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "5000")

Review Comment:
   Why do we need to wait 5 secs for it? I would say we can set to 0 to speed 
up the test. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839452#comment-17839452
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/22/24 2:13 AM:
-

IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor is 1. In which case 
an incremental rebalance is probably preferable to a partition blackout during 
an application bounce, especially if you are using something like spring for 
dependency injection as application boot times are not exactly speedy. If you 
are using in memory state stores at all, you would want to leave the group in 
every scenario I would think.

Long story short, I think the "hack" is relevant if you have a replication 
factor of 0.


was (Author: JIRAUSER305028):
IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor is 1, in which case 
an incremental rebalance is probably preferable to a partition blackout during 
an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839452#comment-17839452
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/22/24 2:10 AM:
-

IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor is 1, in which case 
an incremental rebalance is probably preferable to a partition blackout during 
an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 


was (Author: JIRAUSER305028):
IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor would is 1, in which 
case an incremental rebalance is probably preferable to a partition blackout 
during an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839452#comment-17839452
 ] 

Sal Sorrentino commented on KAFKA-16514:


IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor would is 1, in which 
case an incremental rebalance is probably preferable to a partition blackout 
during an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839441#comment-17839441
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

You are right that there is alway a member-id etc – I am not sure though if 
generating a random group.instance.id would be the right way forward.

Maybe making the Consumer#close() call flexible and allow uses to pass in a 
CloseOption similar to what we do in KafkaStreams would be the cleaner 
approach? An alternative might be (not sure what the exact scope would be) to 
add a new AdminCiient method that allows to pass in a `member.id` to remove a 
consumer from the group?

Another question is: is this "hack" we put into KS to not send a leave group 
request still relevant? A lot of things got improved on the rebalance protocol 
over the years, and it might not be necessary any longer?

Curious to hear what [~ableegoldman] and [~cadonna] think.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839440#comment-17839440
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

I see – this raises a few questions... Given that KIP-869 is not fully 
implemented yet, and the new metrics are not added, I am wondering if we can 
consider the other metric effectively deprecated or not?

[~cadonna] WDYT? Should we push out KAFKA-16336 to 5.0  relaese?

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1574007862


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -173,7 +208,11 @@ private void produceRecord() {
 private void withStableConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());
 try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+if (records.isEmpty()) {

Review Comment:
   Hi @chia7712, thanks for the review. I have addressed all following comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Move KRAFT configs out of KafkaConfig [kafka]

2024-04-21 Thread via GitHub


OmniaGM opened a new pull request, #15775:
URL: https://github.com/apache/kafka/pull/15775

   - Move all Kraft configs/docs/defaults into `KRaftConfigs`. 
   - Note: We have already `RaftConfig` but it seems to contain limited amount 
of configs that only configure `controller` raft and shouldn't include configs 
shared by both brokers/controller in KRAFT mode. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-21 Thread via GitHub


OmniaGM opened a new pull request, #15774:
URL: https://github.com/apache/kafka/pull/15774

   - Pull all quota configs from `KafkaConfig`, `DynamicConfig` and `LogConfig` 
into `ServerQuotaConfigs` in `server-commons` as replication configs are shared 
between server, storage and tools packages. 
   
   - Moved all quota configs out of `ClientQuotaManagerConfig` and 
`ReplicationQuotaManagerConfig` as all quotas has the same default values for 
sample and window size. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573897839


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = 
"listener.security.protocol.map";
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = 
Arrays.stream(SecurityProtocol.values())
+.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> 
sp))
+.entrySet()
+.stream()
+.map(entry -> entry.getKey().value() + ":" + 
entry.getValue().name())
+.collect(Collectors.joining(","));
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map 
between listener names and security protocols. This must be defined for " +
+"the same security protocol to be usable in more than one port or 
IP. For example, internal and " +
+"external traffic can be separated even if SSL is required for 
both. Concretely, the user could define listeners " +
+"with names INTERNAL and EXTERNAL and this property as: 
INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " +
+"separated by a colon and map entries are separated by commas. 
Each listener name should only appear once in the map. " +
+"Different security (SSL and SASL) settings can be configured for 
each listener by adding a normalised " +
+"prefix (the listener name is lowercased) to the config name. For 
example, to set a different keystore for the " +
+"INTERNAL listener, a config with name 
listener.name.internal.ssl.keystore.location would be set. " +
+"If the config for the listener name is not set, the config will 
fallback to the generic config (i.e. ssl.keystore.location). " +
+"Note that in KRaft a default mapping from the listener names 
defined by controller.listener.names to PLAINTEXT " +
+"is assumed if no explicit mapping is provided and no other 
security protocol is in use.";
+
+public static final String LISTENERS_CONFIG = "listeners";
+public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092";
+public static final String LISTENERS_DOC = "Listener List - 
Comma-separated list of URIs we will listen on and the listener names." +
+String.format(" If the listener name is not a security protocol, 
%s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) +
+" Listener names and port numbers must be unique unless %n" +
+" one listener is an IPv4 address and the other listener is %n" +
+" an IPv6 address (for the same port).%n" +
+" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" +
+" Leave hostname empty to bind to default interface.%n" +
+" Examples of legal listener lists:%n" +
+" PLAINTEXT://myhost:9092,SSL://:9091%n" +
+" 
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" +
+" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n";
+
+public static final String ADVERTISED_LISTENERS_CONFIG = 
"advertised.listeners";
+public static final String ADVERTISED_LISTENERS_DOC = String.format(
+"Listeners to publish to ZooKeeper for clients to use, if 
different than the %s config property." +
+" In IaaS environments, this may need to be different from 
the interface to which the broker binds." +
+" If this is not set, the value for %1$1s 
will be used." +
+" Unlike %1$1s, it is not valid to advertise 
the 0.0.0.0 meta-address.%n" +
+" Also unlike %1$1s, there can be duplicated 
ports in this property," +
+" so that one listener 

Re: [PR] Fix typo [kafka]

2024-04-21 Thread via GitHub


Janmm14 commented on PR #15743:
URL: https://github.com/apache/kafka/pull/15743#issuecomment-2068177862

   @ijuma Apache CI might've run untrusted 3rd party code .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573880096


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = 
"listener.security.protocol.map";
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = 
Arrays.stream(SecurityProtocol.values())
+.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> 
sp))
+.entrySet()
+.stream()
+.map(entry -> entry.getKey().value() + ":" + 
entry.getValue().name())
+.collect(Collectors.joining(","));
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map 
between listener names and security protocols. This must be defined for " +
+"the same security protocol to be usable in more than one port or 
IP. For example, internal and " +
+"external traffic can be separated even if SSL is required for 
both. Concretely, the user could define listeners " +
+"with names INTERNAL and EXTERNAL and this property as: 
INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " +
+"separated by a colon and map entries are separated by commas. 
Each listener name should only appear once in the map. " +
+"Different security (SSL and SASL) settings can be configured for 
each listener by adding a normalised " +
+"prefix (the listener name is lowercased) to the config name. For 
example, to set a different keystore for the " +
+"INTERNAL listener, a config with name 
listener.name.internal.ssl.keystore.location would be set. " +
+"If the config for the listener name is not set, the config will 
fallback to the generic config (i.e. ssl.keystore.location). " +
+"Note that in KRaft a default mapping from the listener names 
defined by controller.listener.names to PLAINTEXT " +
+"is assumed if no explicit mapping is provided and no other 
security protocol is in use.";
+
+public static final String LISTENERS_CONFIG = "listeners";
+public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092";
+public static final String LISTENERS_DOC = "Listener List - 
Comma-separated list of URIs we will listen on and the listener names." +
+String.format(" If the listener name is not a security protocol, 
%s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) +
+" Listener names and port numbers must be unique unless %n" +
+" one listener is an IPv4 address and the other listener is %n" +
+" an IPv6 address (for the same port).%n" +
+" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" +
+" Leave hostname empty to bind to default interface.%n" +
+" Examples of legal listener lists:%n" +
+" PLAINTEXT://myhost:9092,SSL://:9091%n" +
+" 
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" +
+" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n";
+
+public static final String ADVERTISED_LISTENERS_CONFIG = 
"advertised.listeners";
+public static final String ADVERTISED_LISTENERS_DOC = String.format(
+"Listeners to publish to ZooKeeper for clients to use, if 
different than the %s config property." +
+" In IaaS environments, this may need to be different from 
the interface to which the broker binds." +
+" If this is not set, the value for %1$1s 
will be used." +
+" Unlike %1$1s, it is not valid to advertise 
the 0.0.0.0 meta-address.%n" +
+" Also unlike %1$1s, there can be duplicated 
ports in this property," +
+" so that one listener 

Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


linu-shibu commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2068156085

   > Thanks @linu-shibu this is a lot closer to what I expected.
   > 
   > Can you also add the build.gradle patch I mentioned earlier? I think this 
is the only raw types used in the storage project.
   
   Done, I was under the impression that it was for debugging and fixing the 
warning in local


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


gharris1727 commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573875832


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde {
 private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
 private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
 
-private final Map remoteLogStorageClassToApiKey;
-private final Map keyToTransform;
 private final BytesApiMessageSerde bytesApiMessageSerde;
+private ApiMessageAndVersion apiMessageAndVersion;

Review Comment:
   I meant it should be a local variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


linu-shibu commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573871668


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde {
 private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
 private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
 
-private final Map remoteLogStorageClassToApiKey;
-private final Map keyToTransform;
 private final BytesApiMessageSerde bytesApiMessageSerde;
+private ApiMessageAndVersion apiMessageAndVersion;
+
+private final RemoteLogSegmentMetadataTransform 
remoteLogSegmentMetadataTransform;

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


linu-shibu commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573870979


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde {
 private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
 private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
 
-private final Map remoteLogStorageClassToApiKey;
-private final Map keyToTransform;
 private final BytesApiMessageSerde bytesApiMessageSerde;
+private ApiMessageAndVersion apiMessageAndVersion;

Review Comment:
   Okay, updating this to a class variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on PR #15772:
URL: https://github.com/apache/kafka/pull/15772#issuecomment-2068139760

   ```
   Code Warning
   FS   Format string should use %n rather than \n in 
org.apache.kafka.network.SocketServerConfigs.()

   
   [Bug type VA_FORMAT_STRING_USES_NEWLINE (click for 
details)](file:///home/chia7712/project/kafka/server/build/reports/spotbugs/main.html#VA_FORMAT_STRING_USES_NEWLINE)
   In class org.apache.kafka.network.SocketServerConfigs
   In method org.apache.kafka.network.SocketServerConfigs.()
   Called method String.format(String, Object[])
   Format string " If the listener name is not a security protocol, 
%s must also be set.\n"
   At SocketServerConfigs.java:[line 48]
   FS   Format string should use %n rather than \n in 
org.apache.kafka.network.SocketServerConfigs.()

   
   [Bug type VA_FORMAT_STRING_USES_NEWLINE (click for 
details)](file:///home/chia7712/project/kafka/server/build/reports/spotbugs/main.html#VA_FORMAT_STRING_USES_NEWLINE)
   In class org.apache.kafka.network.SocketServerConfigs
   In method org.apache.kafka.network.SocketServerConfigs.()
   Called method String.format(String, Object[])
   Format string "Listeners to publish to ZooKeeper for clients to use, if 
different than the %s config property. In IaaS environments, this 
may need to be different from the interface to which the broker binds. If this 
is not set, the value for %1$1s will be used. Unlike 
%1$1s, it is not valid to advertise the 0.0.0.0 meta-address.\n 
Also unlike %1$1s, there can be duplicated ports in this property, 
so that one listener can be configured to advertise another listener's address. 
This can be useful in some cases where external load balancers are used."
   At SocketServerConfigs.java:[line 60]
   FS   Format string should use %n rather than \n in 
org.apache.kafka.network.SocketServerConfigs.()

   
   [Bug type VA_FORMAT_STRING_USES_NEWLINE (click for 
details)](file:///home/chia7712/project/kafka/server/build/reports/spotbugs/main.html#VA_FORMAT_STRING_USES_NEWLINE)
   In class org.apache.kafka.network.SocketServerConfigs
   In method org.apache.kafka.network.SocketServerConfigs.()
   Called method String.format(String, Object[])
   Format string "Name of listener used for communication between controller 
and brokers. A broker will use the %s to locate the endpoint in 
$ListenersProp list, to listen for connections from the controller. For 
example, if a broker's config is:\nlisteners = INTERNAL://192.1.1.8:9092, 
EXTERNAL://10.1.1.5:9093, 
CONTROLLER://192.1.1.8:9094listener.security.protocol.map = INTERNAL:PLAINTEXT, 
EXTERNAL:SSL, CONTROLLER:SSLcontrol.plane.listener.name = CONTROLLER\nOn 
startup, the broker will start listening on "192.1.1.8:9094" with security 
protocol "SSL".\nOn the controller side, when it discovers a broker's published 
endpoints through ZooKeeper, it will use the %1$1s to find the 
endpoint, which it will use to establish connection to the broker.\nFor 
example, if the broker's published endpoints on ZooKeeper are:\n 
"endpoints" : 
["INTERNAL://broker1.example.com:9092","EXTERNAL://broker1.example.com:9093","CONTROLLER://broker1.example.com:9094"]<
 /code>\n and the controller's config is:\nlistener.security.protocol.map 
= INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSLcontrol.plane.listener.name = 
CONTROLLER\nthen the controller will use "broker1.example.com:9094" with 
security protocol "SSL" to connect to the broker.\nIf not explicitly 
configured, the default value will be null and there will be no dedicated 
endpoints for controller connections.\nIf explicitly configured, the value 
cannot be the same as the value of %s."
   At SocketServerConfigs.java:[line 71]
   ```
   
   There are build error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573858499


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -67,13 +69,16 @@ public class ClusterConfig {
 this.listenerName = listenerName;
 this.trustStoreFile = trustStoreFile;
 this.metadataVersion = metadataVersion;
-this.serverProperties = copyOf(serverProperties);
-this.producerProperties = copyOf(producerProperties);
-this.consumerProperties = copyOf(consumerProperties);
-this.adminClientProperties = copyOf(adminClientProperties);
-this.saslServerProperties = copyOf(saslServerProperties);
-this.saslClientProperties = copyOf(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> 
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+this.serverProperties = Collections.unmodifiableMap(serverProperties);
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   Got it, thanks for explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573857883


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -67,13 +69,16 @@ public class ClusterConfig {
 this.listenerName = listenerName;
 this.trustStoreFile = trustStoreFile;
 this.metadataVersion = metadataVersion;
-this.serverProperties = copyOf(serverProperties);
-this.producerProperties = copyOf(producerProperties);
-this.consumerProperties = copyOf(consumerProperties);
-this.adminClientProperties = copyOf(adminClientProperties);
-this.saslServerProperties = copyOf(saslServerProperties);
-this.saslClientProperties = copyOf(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> 
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+this.serverProperties = Collections.unmodifiableMap(serverProperties);
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   The purpose of this PR is to make `ClusterConfig` be immutable. However, 
`ClusterConfig` have only immutable "view" of `serverProperties`, 
`producerProperties`, etc. It means `ClusterConfig` is still mutable object 
since callers can change inner variables of `ClusterConfig` by updating 
`ClusterConfig#Builder`. For example, user can call `putSaslServerProperty` to 
chagne `saslServerProperties` of `ClusterConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573850333


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {

Review Comment:
   I name it `SocketServerConfigs `  instead of  `ServerSocketConfigs` for the 
following reasons
   1.  this is part of `server.network`
   2.  we have `SocketServe` which will move at some point to same package. 
   3. java has something with same name which is `java.net.ServerSocket` so we 
may need to differentiate between classes related `SocketServe` away from build 
in class. 
   
   Am not very attached to the name but it made more sense for me during the 
refactor. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DRAFT] KAFKA-16593: wip [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1573855465


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,477 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singleton;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ClusterTest
+public void testDeleteWithTopicOption() {
+try (Admin admin = cluster.createAdminClient()) {
+admin.createTopics(buildSingletonTestTopic());

Review Comment:
   please call `get` to make sure the request is completed.



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,477 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import 

Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573855899


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -67,13 +69,16 @@ public class ClusterConfig {
 this.listenerName = listenerName;
 this.trustStoreFile = trustStoreFile;
 this.metadataVersion = metadataVersion;
-this.serverProperties = copyOf(serverProperties);
-this.producerProperties = copyOf(producerProperties);
-this.consumerProperties = copyOf(consumerProperties);
-this.adminClientProperties = copyOf(adminClientProperties);
-this.saslServerProperties = copyOf(saslServerProperties);
-this.saslClientProperties = copyOf(saslClientProperties);
-perBrokerOverrideProperties.forEach((brokerId, props) -> 
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+this.serverProperties = Collections.unmodifiableMap(serverProperties);
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   Pardon me, I'm not quite clear about this comment. Could you explain more ? 
:smiley: 
   For convenience, this pr add something like `public Builder 
putServerProperty(String key, String value) ` in each configurations like 
`serverProperties`, `consumerProperties`. So we don't need to do deep copy in 
ClusterConfig.Builder. 
   Or did you mean we need to use `setServerProperty(Map 
serverProperties)`  instead of `putServerProperty(String key, String value)`  ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


gharris1727 commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573855150


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde {
 private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
 private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
 
-private final Map remoteLogStorageClassToApiKey;
-private final Map keyToTransform;
 private final BytesApiMessageSerde bytesApiMessageSerde;
+private ApiMessageAndVersion apiMessageAndVersion;
+
+private final RemoteLogSegmentMetadataTransform 
remoteLogSegmentMetadataTransform;

Review Comment:
   This looks pretty verbose, can you shorten the variable names? We're already 
in the `RemoteLogMetadataSerde`, so `remoteLog` and `Metadata` end up being 
visual clutter. 



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde {
 private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
 private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
 
-private final Map remoteLogStorageClassToApiKey;
-private final Map keyToTransform;
 private final BytesApiMessageSerde bytesApiMessageSerde;
+private ApiMessageAndVersion apiMessageAndVersion;

Review Comment:
   This shouldn't be an instance variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


linu-shibu commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573854456


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -65,50 +59,45 @@ protected ApiMessage newApiMessage(short apiKey) {
 return MetadataRecordType.fromId(apiKey).newMetadataRecord();
 }
 
-protected final Map 
createRemoteLogMetadataTransforms() {
-Map map = new HashMap<>();
-map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataTransform());
-map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateTransform());
-map.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataTransform());
-map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new 
RemoteLogSegmentMetadataSnapshotTransform());
-return map;
-}
-
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+ApiMessageAndVersion apiMessageAndVersion;
+if (remoteLogMetadata instanceof RemoteLogSegmentMetadata) {
+RemoteLogSegmentMetadataTransform metadataTransform = new 
RemoteLogSegmentMetadataTransform();

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573853257


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {

Review Comment:
   > Am not very attached to the name but it made more sense for me during the 
refactor. WDYT?
   
   that makes sense to me. +1 :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on PR #15772:
URL: https://github.com/apache/kafka/pull/15772#issuecomment-2068132025

   @OmniaGM I merge #15770 first since it is a smaller PR. Please fix 
conflicts. thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16595) Introduce template in ClusterTests

2024-04-21 Thread Kuan Po Tseng (Jira)
Kuan Po Tseng created KAFKA-16595:
-

 Summary: Introduce template in ClusterTests
 Key: KAFKA-16595
 URL: https://issues.apache.org/jira/browse/KAFKA-16595
 Project: Kafka
  Issue Type: Improvement
Reporter: Kuan Po Tseng
Assignee: Kuan Po Tseng


discussed in https://github.com/apache/kafka/pull/15761#discussion_r1573850549

Currently we can't apply any template in ClusterTests, thus we have to write 
down all ClusterConfigProperty in each ClusterTest inside ClusterTests. And 
that could leave bunch of duplicate code. We need to find a way to reduce the 
duplicate code. Introduce template in ClusterTests could be a solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573852618


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-16595



##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   gentle ping @chia7712 , Filed 
https://issues.apache.org/jira/browse/KAFKA-16595



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Refactor KafkaConfig to use PasswordEncoderConfigs [kafka]

2024-04-21 Thread via GitHub


chia7712 merged PR #15770:
URL: https://github.com/apache/kafka/pull/15770


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573851135


##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -46,6 +42,10 @@ public class Defaults {
 public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500;
 public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024;
 public static final boolean DELETE_TOPIC_ENABLE = true;
+public static final int REQUEST_TIMEOUT_MS = 3;
+public static final long CONNECTION_SETUP_TIMEOUT_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS;

Review Comment:
   These will move soon in the final PR couple of prs for moving KafkaConfig 
out of core. There will be some refactoring to move more stuff into 
`CommonClientConfigs` and some to `ServerCommonConfigs`. So I would keep it for 
now until I fully finish this jira. `KafkaConfig.scala` will join the rest of 
the configs in `server` pretty soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573851263


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   OK, let's talk about this in that JIRA. What I've wondering here is to 
simplify the duplicate ClusterConfigProperty here. Add `ClusterTemplate` to 
`ClusterTests` is a way. But maybe we can have other solutions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573851135


##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -46,6 +42,10 @@ public class Defaults {
 public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500;
 public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024;
 public static final boolean DELETE_TOPIC_ENABLE = true;
+public static final int REQUEST_TIMEOUT_MS = 3;
+public static final long CONNECTION_SETUP_TIMEOUT_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS;

Review Comment:
   These will move soon in the final PR couple of prs for moving KafkaConfig 
out of core. So I would keep it for now until I fully finish this jira. 
`KafkaConfig.scala` will join the rest of the configs in `server` pretty soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573851135


##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -46,6 +42,10 @@ public class Defaults {
 public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500;
 public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024;
 public static final boolean DELETE_TOPIC_ENABLE = true;
+public static final int REQUEST_TIMEOUT_MS = 3;
+public static final long CONNECTION_SETUP_TIMEOUT_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS;

Review Comment:
   These will move soon in the final PR couple of prs for moving KafkaConfig 
out of core. So I would keep it for now until I fully finish this jira. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573850333


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {

Review Comment:
   I name it `SocketServerConfigs `  instead of  `ServerSocketConfigs` for the 
following reasons
   1.  this is part of `server.network`
   2.  we have `SocketServe` which will move at some point to same package. 
   3. java has something `java.net.ServerSocket` so we may need to 
differentiate between classes related `SocketServe` away from build in class. 
   
   Am not very attached to the name but it made more sense for me during the 
refactor. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573850549


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   > Currently we can't add ClusterTemplate in ClusterTests. Should we do that 
is this PR ?
   
   Could you file a Jira for that? Also, we can leave a TODO here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573849975


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   Currently we can't add `ClusterTemplate` in ClusterTests. Should we do that 
is this PR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1573848774


##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -46,6 +42,10 @@ public class Defaults {
 public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500;
 public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024;
 public static final boolean DELETE_TOPIC_ENABLE = true;
+public static final int REQUEST_TIMEOUT_MS = 3;
+public static final long CONNECTION_SETUP_TIMEOUT_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS;
+public static final long CONNECTION_SETUP_TIMEOUT_MAX_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS;

Review Comment:
   ditto



##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = 
"listener.security.protocol.map";
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = 
Arrays.stream(SecurityProtocol.values())
+.collect(Collectors.toMap(sp -> 
ListenerName.forSecurityProtocol(sp), sp -> sp))

Review Comment:
   `.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> sp))`



##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -46,6 +42,10 @@ public class Defaults {
 public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500;
 public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024;
 public static final boolean DELETE_TOPIC_ENABLE = true;
+public static final int REQUEST_TIMEOUT_MS = 3;
+public static final long CONNECTION_SETUP_TIMEOUT_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS;

Review Comment:
   Do we need `CONNECTION_SETUP_TIMEOUT_MS` as it is used by 
`KafkaConfig.scala` only.



##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {

Review Comment:
   Should we call it `ServerSocketConfigs` so as to align it with 
`ServerLogConfigs`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573839545


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -290,7 +287,7 @@ public void waitForReadyBrokers() throws 
InterruptedException {
 }
 
 @Override
-public void rollingBrokerRestart() {
+public void rollingBrokerRestart(Optional 
clusterConfig) {

Review Comment:
   As not all implementation support this method, we should remove it from 
interface. The callers can use `getUnderlying` to get zk instance and call that 
method



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -211,13 +186,36 @@ public static class Builder {
 private String listenerName;
 private File trustStoreFile;
 private MetadataVersion metadataVersion;
-private Properties serverProperties = new Properties();
-private Properties producerProperties = new Properties();
-private Properties consumerProperties = new Properties();
-private Properties adminClientProperties = new Properties();
-private Properties saslServerProperties = new Properties();
-private Properties saslClientProperties = new Properties();
-private final Map perBrokerOverrideProperties = 
new HashMap<>();
+private Map serverProperties = new HashMap<>();
+private Map producerProperties = new HashMap<>();
+private Map consumerProperties = new HashMap<>();
+private Map adminClientProperties = new HashMap<>();
+private Map saslServerProperties = new HashMap<>();
+private Map saslClientProperties = new HashMap<>();
+private Map> perBrokerOverrideProperties 
= new HashMap<>();
+
+Builder() {}
+
+Builder(ClusterConfig clusterConfig) {
+this.type = clusterConfig.type;
+this.brokers = clusterConfig.brokers;
+this.controllers = clusterConfig.controllers;
+this.name = clusterConfig.name;
+this.autoStart = clusterConfig.autoStart;
+this.securityProtocol = clusterConfig.securityProtocol;
+this.listenerName = clusterConfig.listenerName;
+this.trustStoreFile = clusterConfig.trustStoreFile;
+this.metadataVersion = clusterConfig.metadataVersion;
+this.serverProperties = new 
HashMap<>(clusterConfig.serverProperties);
+this.producerProperties = new 
HashMap<>(clusterConfig.producerProperties);
+this.consumerProperties = new 
HashMap<>(clusterConfig.consumerProperties);
+this.adminClientProperties = new 
HashMap<>(clusterConfig.adminClientProperties);
+this.saslServerProperties = new 
HashMap<>(clusterConfig.saslServerProperties);
+this.saslClientProperties = new 
HashMap<>(clusterConfig.saslClientProperties);
+Map> perBrokerOverrideProps = new 
HashMap<>();
+clusterConfig.perBrokerOverrideProperties.forEach((k, v) -> 
perBrokerOverrideProps.put(k, new HashMap<>(v)));
+this.perBrokerOverrideProperties = perBrokerOverrideProps;

Review Comment:
   ```java
   this.perBrokerOverrideProperties = 
clusterConfig.perBrokerOverrideProperties.entrySet().stream()
   .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
HashMap<>(e.getValue(;
   ```



##
core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala:
##
@@ -18,41 +18,58 @@ package kafka.server
 
 import java.net.Socket
 import java.util.Collections
-
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, 
kafkaServerSaslMechanisms}
+import kafka.test.annotation.{ClusterTemplate, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 import scala.jdk.CollectionConverters._
 
+object SaslApiVersionsRequestTest {
+  val kafkaClientSaslMechanism = "PLAIN"
+  val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
+  val controlPlaneListenerName = "CONTROL_PLANE"
+  val securityProtocol = 

Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1573834240


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -173,7 +208,11 @@ private void produceRecord() {
 private void withStableConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());
 try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+if (records.isEmpty()) {

Review Comment:
   Could we rewrite it by `assertNotEquals(0, records.count())`?



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -184,7 +223,11 @@ private void withStableConsumerGroup(Runnable body) {
 private void withEmptyConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());

Review Comment:
   please use  try-with-resources



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -173,7 +208,11 @@ private void produceRecord() {
 private void withStableConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());

Review Comment:
   please use  try-with-resources



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1573832025


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -16,7 +16,15 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;

Review Comment:
   > I will create another minor PR to refactor it. Thanks.
   
   nice!!!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15762:
URL: https://github.com/apache/kafka/pull/15762#discussion_r1573826054


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1256,6 +1256,16 @@ public static class ConfigKey {
 public final boolean internalConfig;
 public final String alternativeString;
 
+// This constructor is present for backward compatibility reasons.
+public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,
+ Importance importance, String documentation, String 
group,
+ int orderInGroup, Width width, String displayName,
+ List dependents, Recommender recommender,
+ boolean internalConfig) {
+this(name, type, defaultValue, validator, importance, 
documentation, group, orderInGroup, width, displayName,
+dependents, recommender, internalConfig, null);
+}
+
 public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,

Review Comment:
   As #13909 is not in any release, we can change this constructor from 
`public` to `private`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Move StorageTool to tools [kafka]

2024-04-21 Thread via GitHub


fvaleri commented on PR #14847:
URL: https://github.com/apache/kafka/pull/14847#issuecomment-2068086501

   @showuon @mimaison I think this is now ready for review.
   
   I think now changes are well isolated. There is no code refactoring or Kafka 
configuration changes, so comparison with the original code should be 
straightforward. I'm using `LogConfig` to get all required configurations. Most 
keys are taken from the configuration objects migrated from `KafkaConfig`. I 
only miss 3 of them, that I'm defining as constants in `StorageTool`, and will 
be removed as soon as they are migrated.
   
   I tried all commands and options, comparing the output with the old 
implementation. I also ran a local 3-nodes Kafka cluster in KRaft mode. 
Finally, I ran all unit and integration tests.
   
   Please have a look. Thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-21 Thread via GitHub


vamossagar12 commented on PR #15762:
URL: https://github.com/apache/kafka/pull/15762#issuecomment-2068083995

   Thanks @chia7712 , I am thinking we can make the constructors as private and 
let the users use only `define` method. Even in the AK codebase, I don't see 
the constructor being used widely and all declarations are being done using 
`define`. I can file a ticket for this if we agree to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-21 Thread via GitHub


vamossagar12 commented on code in PR #15762:
URL: https://github.com/apache/kafka/pull/15762#discussion_r1573818929


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1256,6 +1256,15 @@ public static class ConfigKey {
 public final boolean internalConfig;
 public final String alternativeString;
 
+public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]

2024-04-21 Thread via GitHub


ilyazr commented on PR #15701:
URL: https://github.com/apache/kafka/pull/15701#issuecomment-2068079457

   @soarez 
   Hi! I've added some changes to fix those tests, but now it shows one failing 
check. Could you tell me what has gone wrong now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1573795818


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -16,7 +16,15 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;

Review Comment:
   It looks like the only place using `subscribeAndWaitForRecords` is 
`PlaintextAdminIntegrationTest` after this PR is merge. I will create another 
minor PR to refactor it. Thanks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16588: broker shutdown hangs when file.delete.delay.ms is zero [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 opened a new pull request, #15773:
URL: https://github.com/apache/kafka/pull/15773

   If `file.delete.delay.ms` is zero, We call `take` even though the 
`logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` 
rather than `shudownNow` 
(https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)
   
   Hence, the thread won't be completed forever, and it blocks the shutdown of 
broker.
   
   We should replace the `take` by `poll` since we have checked the element 
before.
   
   The zero is a valid value: 
https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


gharris1727 commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573784112


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -65,50 +59,45 @@ protected ApiMessage newApiMessage(short apiKey) {
 return MetadataRecordType.fromId(apiKey).newMetadataRecord();
 }
 
-protected final Map 
createRemoteLogMetadataTransforms() {
-Map map = new HashMap<>();
-map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataTransform());
-map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateTransform());
-map.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataTransform());
-map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new 
RemoteLogSegmentMetadataSnapshotTransform());
-return map;
-}
-
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+ApiMessageAndVersion apiMessageAndVersion;
+if (remoteLogMetadata instanceof RemoteLogSegmentMetadata) {
+RemoteLogSegmentMetadataTransform metadataTransform = new 
RemoteLogSegmentMetadataTransform();

Review Comment:
   Please move these variables to fields, and the `new` calls to the serde 
constructor. We can reuse the same fields for the deserialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: use assertInstanceOf to replace assertTrue [kafka]

2024-04-21 Thread via GitHub


evalaiyc98 commented on PR #15769:
URL: https://github.com/apache/kafka/pull/15769#issuecomment-2068055223

   @chia7712 Could you help review this?
   If there is a need for any further improvement, please let me know. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


linu-shibu commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573755412


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -74,25 +72,24 @@ protected final Map 
createRemoteLogMetadataTr
 return map;
 }
 
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+RemoteLogMetadataTransform metadataTransform;
+
+if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) {

Review Comment:
   Okay, thanks for the clarification! I will change the comparison to use 
instanceof operator since it is already used in the project.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM opened a new pull request, #15772:
URL: https://github.com/apache/kafka/pull/15772

   - Move socket configs and docs out of core and into 
`org.apache.kafka.network.SocketServerConfigs`
   - Move default values for socket configs into same class
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


gharris1727 commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573729186


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -74,25 +72,24 @@ protected final Map 
createRemoteLogMetadataTr
 return map;
 }
 
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+RemoteLogMetadataTransform metadataTransform;
+
+if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) {

Review Comment:
   I don't think it makes a correctness difference since this is a flat 
hierarchy and there aren't any subclasses of each of the serialized types.
   
   There might be a slight performance difference, but not significant when the 
code is overall unoptimized, and we can't know without measuring it.
   
   I would have reached for instance of because it is more null safe, but if 
there's an explicit null guard somewhere else then there is again no difference 
in correctness, just style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


linu-shibu commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573721007


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -74,25 +72,24 @@ protected final Map 
createRemoteLogMetadataTr
 return map;
 }
 
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+RemoteLogMetadataTransform metadataTransform;
+
+if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) {

Review Comment:
   Is there an advantage in using instanceof instead of using getClass() in 
this scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16594) Add a test to detect CDS errors

2024-04-21 Thread Vedarth Sharma (Jira)
Vedarth Sharma created KAFKA-16594:
--

 Summary: Add a test to detect CDS errors
 Key: KAFKA-16594
 URL: https://issues.apache.org/jira/browse/KAFKA-16594
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vedarth Sharma
Assignee: Vedarth Sharma


Currently pipeline cannot detect whether CDS is working as expected or not. A 
test for this will help.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Minor: Fix CDS in docker image [kafka]

2024-04-21 Thread via GitHub


VedarthConfluent opened a new pull request, #15771:
URL: https://github.com/apache/kafka/pull/15771

   Fix CDS in docker image.
   
   Due to difference in packages present when jsa files were generated and when 
docker image is generated, there is a log on starting docker image.
   
   `[0.001s][warning][cds] The shared archive file has a bad magic number: 0`
   
   This PR fixes the warning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Refactor KafkaConfig to use PasswordEncoderConfigs [kafka]

2024-04-21 Thread via GitHub


OmniaGM opened a new pull request, #15770:
URL: https://github.com/apache/kafka/pull/15770

   - Move docs to PasswordEncoderConfigs
   - Renamed the configs in `PasswordEncoderConfigs` to match `_CONFIG` suffix 
pattern
   - Move default values to `PasswordEncoderConfigs`
   - Replace `KafkaConfig.PassowrdEncoder*Prop` with 
`PasswordEncoderConfigs.*_CONFIGS`
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: use assertInstanceOf to replace assertTrue [kafka]

2024-04-21 Thread via GitHub


evalaiyc98 opened a new pull request, #15769:
URL: https://github.com/apache/kafka/pull/15769

   I am a newbie and this is a minor change that use `assertInstanceOf` to 
replace the `assertTrue`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16550) add integration test for LogDirsCommand

2024-04-21 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16550:
--

Assignee: JiaChi Wang  (was: Chia-Ping Tsai)

> add integration test for LogDirsCommand
> ---
>
> Key: KAFKA-16550
> URL: https://issues.apache.org/jira/browse/KAFKA-16550
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: JiaChi Wang
>Priority: Minor
>
> Currently LogDirsCommand have only UT



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16550) add integration test for LogDirsCommand

2024-04-21 Thread JiaChi Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839369#comment-17839369
 ] 

JiaChi Wang commented on KAFKA-16550:
-

Hi [~chia7712] I'm interested in this issue. Can I take this one? Thanks.

> add integration test for LogDirsCommand
> ---
>
> Key: KAFKA-16550
> URL: https://issues.apache.org/jira/browse/KAFKA-16550
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Currently LogDirsCommand have only UT



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-1655: new test for ClusterTool, integration with ClusterTest [kafka]

2024-04-21 Thread via GitHub


dboyliao commented on PR #15768:
URL: https://github.com/apache/kafka/pull/15768#issuecomment-2067968427

   The blockers here are `testClusterTooOldToHaveId` and `testUnregisterBroker`.
   As for `testClusterTooOldToHaveId`, I'm trying which metadata version is old 
enough to be identified as too old.
   As for `testUnregisterBroker`, the error message is `WARN 
[BrokerLifecycleManager id=0] Broker 0 sent a heartbeat request but received 
error STALE_BROKER_EPOCH. (kafka.server.BrokerLifecycleManager:70)` and the 
testing seems to be block on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16551) add integration test for ClusterTool

2024-04-21 Thread Yin Chen Liao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839367#comment-17839367
 ] 

Yin Chen Liao commented on KAFKA-16551:
---

I want to take over this issue.

> add integration test for ClusterTool
> 
>
> Key: KAFKA-16551
> URL: https://issues.apache.org/jira/browse/KAFKA-16551
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067966296

   >  thanks for the great suggestion. I took a look LogSegment#deleteIfExists 
and LogSegment#deleteTypeIfExists. If we want to handle fallback deletion in 
LocalLog, we may need to return true/false in that two functions. However, 
LogSegment#deleteIfExists uses Utils.tryAll to handle 4 try/catch blocks. If we 
want to return true/false, we need to refactor Utils.tryAll as well. Finally, 
LogSegment#deleteIfExists is not only used by LocalLog.deleteSegmentFiles, but 
also LocalLog.splitOverflowedSegment. We need to handle 
LocalLog.splitOverflowedSegment path, too. I think we can use another Jira to 
track the change. Thanks.
   
   agree. Let's ship it first.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-21 Thread via GitHub


The-Gamer-01 commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1573677249


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -74,25 +72,24 @@ protected final Map 
createRemoteLogMetadataTr
 return map;
 }
 
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+RemoteLogMetadataTransform metadataTransform;
+
+if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) {

Review Comment:
   Why this pr don't use **instanceof**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix javadoc argument name in KRaftMetadataCache#getTopicMetadataForDescribeTopicResponse [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15764:
URL: https://github.com/apache/kafka/pull/15764#discussion_r1573667382


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -267,7 +267,7 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
*
* @param topicsThe iterator of topics and their 
corresponding first partition id to fetch.
* @param listenerName  The listener name.
-   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param topicPartitionStartIndex  The start partition index for the 
first topic

Review Comment:
   Looks like the description here isn't correct, could you help modify it ? 
Huge thanks :smiley: 
   Maybe `The function that return the start partition index for the topic`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16549: suppress the warnings from RemoteLogManager [kafka]

2024-04-21 Thread via GitHub


charliecheng630 opened a new pull request, #15767:
URL: https://github.com/apache/kafka/pull/15767

   - suppress the warnings from RemoteLogManager
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1573652117


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 
seconds.
+// The first retention task is executed after 30 seconds, so waiting for 
35 seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 35000)

Review Comment:
   Thank you. I updated `LOG_INITIAL_TASK_DELAY_MS_CONFIG` as 5 seconds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067936507

   > This PR is good but it seems to me `LogSegment` should NOT guess the 
directory structure managed by upper class (i.e `LogManager`).
   > 
   > It seems the root cause is caused by following steps:
   > 
   > 1. the segments to be deleted removed from `LocalLog`
   > 2. `LocalLog#renameDir` move whole folder
   > 3. `LocalLog#renameDir` update the parent folder for all segments. 
However, the segments to be deleted are removed form inner collection already. 
Hence, some `Segment#log` has a stale file.
   > 
   > If I understand correctly, another solution is that we pass a function to 
get latest dir when calling `deleteSegmentFiles` (
   > 
   > 
https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904
   > 
   > ). If deleting segment get not-found error, we call `updateParentDir` and 
delete it again.
   > WDYT?
   
   Hi @chia7712, thanks for the great suggestion. I took a look 
`LogSegment#deleteIfExists` and `LogSegment#deleteTypeIfExists`. If we want to 
handle fallback deletion in `LocalLog`, we may need to return true/false in 
that two functions. However, `LogSegment#deleteIfExists` uses `Utils.tryAll` to 
handle 4 try/catch blocks. If we want to return true/false, we need to refactor 
`Utils.tryAll` as well. Finally, `LogSegment#deleteIfExists` is not only used 
by `LocalLog.deleteSegmentFiles`, but also `LocalLog.splitOverflowedSegment`. 
We need to handle `LocalLog.splitOverflowedSegment` path, too. I think we can 
use another Jira to track the change. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-21 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573635829


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   Due to `super.brokerPropertyOverrides(config.serverProperties())` in 
`BeforeEach`
   ```java
 // Configure control plane listener to make sure we have separate 
listeners for testing.
 def brokerPropertyOverrides(properties: Properties): Unit = {
   if (!cluster.isKRaftTest) {
 val controlPlaneListenerName = "CONTROL_PLANE"
 val securityProtocol = cluster.config().securityProtocol()
 properties.setProperty(KafkaConfig.ControlPlaneListenerNameProp, 
controlPlaneListenerName)
 properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
 properties.setProperty("listeners", 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
 properties.setProperty(KafkaConfig.AdvertisedListenersProp, 
s"$securityProtocol://localhost:0,${controlPlaneListenerName}://localhost:0")
   }
 }
   ```
   Which add properties if cluster type is under ZK mode. While now we make 
serverProperties immutable, so I have to list all test configs for all cluster 
type here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [DRAFT] KAFKA-16593: wip [kafka]

2024-04-21 Thread via GitHub


frankvicky opened a new pull request, #15766:
URL: https://github.com/apache/kafka/pull/15766

   Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16548: Change to use `streamingIterator` [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15765:
URL: https://github.com/apache/kafka/pull/15765#discussion_r1573633727


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -463,7 +464,9 @@ private Optional 
lookupTimestamp(RemoteLogSegmen
 RecordBatch batch = remoteLogInputStream.nextBatch();
 if (batch == null) break;
 if (batch.maxTimestamp() >= timestamp && batch.lastOffset() >= 
startingOffset) {
-for (Record record : batch) {
+Iterator recordStreamingIterator = 
batch.streamingIterator(BufferSupplier.NO_CACHING);

Review Comment:
   Could you please close the `recordStreamingIterator` when returning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16548: Change to use `streamingIterator` [kafka]

2024-04-21 Thread via GitHub


m1a2st commented on PR #15765:
URL: https://github.com/apache/kafka/pull/15765#issuecomment-2067922075

   @chia7712 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16547) add test for DescribeConfigsOptions#includeDocumentation

2024-04-21 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-16547:
-

Assignee: Ming-Yen Chung  (was: Yu-Lin Chen)

> add test for DescribeConfigsOptions#includeDocumentation
> 
>
> Key: KAFKA-16547
> URL: https://issues.apache.org/jira/browse/KAFKA-16547
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Ming-Yen Chung
>Priority: Major
>
> as title, we have no tests for the query option.
> If the option is configured to false, `ConfigEntry#documentation` should be 
> null. otherwise, it should return the config documention.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16547) add test for DescribeConfigsOptions#includeDocumentation

2024-04-21 Thread Ming-Yen Chung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839323#comment-17839323
 ] 

Ming-Yen Chung commented on KAFKA-16547:


Hi [~chia7712], 

I would like to do it as my first issue. Could you assign this ticket to me?

> add test for DescribeConfigsOptions#includeDocumentation
> 
>
> Key: KAFKA-16547
> URL: https://issues.apache.org/jira/browse/KAFKA-16547
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Yu-Lin Chen
>Priority: Major
>
> as title, we have no tests for the query option.
> If the option is configured to false, `ConfigEntry#documentation` should be 
> null. otherwise, it should return the config documention.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)