Re: [PR] KAFKA-15739: KRaft support in ResetConsumerGroupOffsetTest [kafka]

2024-05-19 Thread via GitHub


linzihao1999 closed pull request #14686: KAFKA-15739: KRaft support in 
ResetConsumerGroupOffsetTest
URL: https://github.com/apache/kafka/pull/14686


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16708) Allow dynamic port for advertised listeners in Docker image

2024-05-19 Thread Chris Bono (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847767#comment-17847767
 ] 

Chris Bono commented on KAFKA-16708:


Hi [~vedarth] 
{quote}Thanks a lot for raising the ticket and sharing WIP commit as well. I am 
really happy to see your interest in improving the apache kafka docker image. 
{quote}
You are more than welcome.
{quote}I am interested in understanding the benefit of having dynamic port.
One clear benefit is that static mapping isn't needed. But are there other 
benefits? 
{quote}
We have a large set of smoke tests that each use docker-compose and when using 
static port mappings we get port collisions from time to time. Using dynamic 
port mappings alleviates this issue.
{quote}
Given that docker in docker has security concerns, I think it's important to 
make this exclusive for testing purposes. Since apache/kafka docker image is 
meant for production usage, I think it might be better to consider adding this 
to just KIP-974 apache/kafka-native image (to be released in 3.8.0) as it's 
meant for testing purposes and local usage. This will also remove the risk of 
users enabling this by accident in production.
{quote}
Nice! I was unaware of KIP-974; this "kafka-native" (non-production use) image 
does sound like an excellent delivery vehicle w/o the risks of running this in 
PROD.

> Allow dynamic port for advertised listeners in Docker image
> ---
>
> Key: KAFKA-16708
> URL: https://issues.apache.org/jira/browse/KAFKA-16708
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chris Bono
>Priority: Major
>
> First of all, thank you all for adding the official Kafka Docker image (I 
> know it is a big responsibility and adds to the team workload).
> I am migrating from {{wurstmeister/kafka}} to the official {{apache/kafka}} 
> image. 
> My advertised port is not static and was relying on [the PORT_COMMAND 
> feature|https://github.com/wurstmeister/kafka-docker/commit/c66375fc3b94e98dbecd603c5d2b44c06e927e88]
>  in the {{wurstmeister/kafka}} image to determine the port programatically. 
> This would let me define a docker-compose as follows:
> {code:java}
> services:
>   kafka:
>     image: apache/kafka:latest
>     hostname: kafka
>     ports:
>       - "9092"
>     volumes:
>       - '/var/run/docker.sock.raw:/var/run/docker.sock'
>     environment:
>       KAFKA_NODE_ID: 1
>       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
> 'CONTROLLER:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
>       KAFKA_LISTENERS: 
> 'CONTROLLER://kafka:29093,PLAINTEXT_DOCKER://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092'
>       KAFKA_ADVERTISED_LISTENERS: 
> 'PLAINTEXT_DOCKER://kafka:29092,PLAINTEXT_HOST://localhost:_{PORT_COMMAND}'
>       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>       KAFKA_PROCESS_ROLES: 'broker,controller'
>       KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
>       KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT_DOCKER'
>       KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
>       PORT_COMMAND: "docker ps | egrep 'kafka' | cut -d: -f 3 | cut -d- -f 
> 1"{code}
> Notice how the "ports" are dynamically mapped (i.e. not *"port:port"* syntax) 
> - the actual port will *not* be "9092".
> Do you have a suggestion for an alternative approach on how to obtain a 
> non-static port for advertised listeners? If not, would adding conditional 
> support for this feature be welcomed? 
> I am aware of the complication/concern of this request as it is Docker in 
> Docker for non-root users (described 
> [here|https://jonfriesen.ca/articles/docker-in-docker-non-root-user/]) and as 
> such we could make it inactive by default and users would have to opt-in 
> explicitly.
> I have created a [rough 
> WIP|https://github.com/onobc/kafka/commit/6556c4adbf08155b89c9804c2c5d1a988f8371f2]
>  that illustrates the concept (there is no conditionality in it currently). 
> Note that the container is not run as {*}root{*}, but rather the *appuser* is 
> added to whatever group that own the docker.sock (which on my machine is 
> root).
>  
> P.S.
>  * This is my first time filing an issue w/ Kafka so if I missed anything 
> please let me know and I am glad to add whatever other info, etc.. 
>  * I am not sure what "Component" this should be under (the other Kafka 
> Docker related issues had differing values here)
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-19 Thread via GitHub


gongxuanzhang commented on PR #15946:
URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119568301

   > Could you please fix the build error?
   
   fixed it, i'm so sorry.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-19 Thread via GitHub


gongxuanzhang commented on PR #15946:
URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119506863

   > Could you please fix the build error?
   I can,I will submit later
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16796) Introduce new org.apache.kafka.tools.api.MessageParser to replace kafka.tools.DumpLogSegments.MessageParser

2024-05-19 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847744#comment-17847744
 ] 

PoAn Yang commented on KAFKA-16796:
---

Hi [~chia7712], I'm interested in this. May I take it? Thank you.

> Introduce new org.apache.kafka.tools.api.MessageParser to replace 
> kafka.tools.DumpLogSegments.MessageParser
> ---
>
> Key: KAFKA-16796
> URL: https://issues.apache.org/jira/browse/KAFKA-16796
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
>
> We need a replacement in order to complete 
> https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16797: A bit cleanup of FeatureControlManager [kafka]

2024-05-19 Thread via GitHub


m1a2st opened a new pull request, #15997:
URL: https://github.com/apache/kafka/pull/15997

   Clean up `brokerSupported` and `controllerSupported` method by using 
`Collections.emptyIterator()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: ensure KafkaServerTestHarness::tearDown is always invoked [kafka]

2024-05-19 Thread via GitHub


gaurav-narula opened a new pull request, #15996:
URL: https://github.com/apache/kafka/pull/15996

   An exception thrown while closing the client instances in 
`IntegrationTestHarness::tearDown` may result in
   `KafkaServerTestHarness::tearDown` not being invoked. This would result in 
thread leaks of the broker and controller threads spawned in the failing test.
   
   An example of this is the [CI
   
run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15994/1/tests)
 for #15994 where
   `Build / JDK 8 and Scala 2.12 / testCoordinatorFailover(String, 
String).quorum=kraft+kip848.groupProtocol=consumer – 
kafka.api.PlaintextConsumerTest` failing results in 
`consumers.foreach(_.close(Duration.ZERO))` in 
`IntegrationTestHarness::tearDown` throwing an exception.
   
   A side effect of this is it poisons Gradle test runner JVM and prevents 
tests in other unrelated classes from executing as `@BeforeAll` check in 
QuorumTestHarness would cause them to fail immediately.
   
   This PR encloses the client closure in try-finally to ensure 
`KafkaServerTestHarness::tearDown` is always invoked.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16469: Metadata schema checker [kafka]

2024-05-19 Thread via GitHub


cmccabe opened a new pull request, #15995:
URL: https://github.com/apache/kafka/pull/15995

   Create a schema checker that can validate that later versions of a KRPC 
schema are compatible with earlier ones.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16469) Metadata Schema Checker

2024-05-19 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-16469:


Assignee: Colin McCabe

> Metadata Schema Checker
> ---
>
> Key: KAFKA-16469
> URL: https://issues.apache.org/jira/browse/KAFKA-16469
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16529) Response handling and request sending for voters RPCs

2024-05-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16529:
---
Description: 
Implement response handling and request sending for the following RPCs:
 # Vote
 # BeginQuorumEpoch
 # Fetch

When loading the leader from the quorum-state file don't assume that the leader 
is in the voter set.

  was:
Implement response handling and request sending for the following RPCs:
 # Vote
 # BeginQuorumEpoch
 # Fetch


> Response handling and request sending for voters RPCs
> -
>
> Key: KAFKA-16529
> URL: https://issues.apache.org/jira/browse/KAFKA-16529
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> Implement response handling and request sending for the following RPCs:
>  # Vote
>  # BeginQuorumEpoch
>  # Fetch
> When loading the leader from the quorum-state file don't assume that the 
> leader is in the voter set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16797) A bit cleanup of FeatureControlManager

2024-05-19 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16797:
--

 Summary: A bit cleanup of FeatureControlManager
 Key: KAFKA-16797
 URL: https://issues.apache.org/jira/browse/KAFKA-16797
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java#L62]

 

that can be replaced by `Collections.emptyIterator()`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16653) Remove delayed initialization because of static voter set

2024-05-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16653:
---
Description: 
Once KRaft supports the AddVoter RPC, the QuorumTestHarness and 
KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the 
static voter set.

This should allow us to remove KRaft's support for delay static voter set 
initialization.

This should also include updates to KRaftClusterTestKit to take advantage to 
the controller bootstrap servers.

  was:
Once KRaft supports the AddVoter RPC, the QuorumTestHarness and 
KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the 
static voter set.

This should allow us to remove KRaft's support for delay static voter set 
initialization.


> Remove delayed initialization because of static voter set
> -
>
> Key: KAFKA-16653
> URL: https://issues.apache.org/jira/browse/KAFKA-16653
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Priority: Major
>
> Once KRaft supports the AddVoter RPC, the QuorumTestHarness and 
> KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the 
> static voter set.
> This should allow us to remove KRaft's support for delay static voter set 
> initialization.
> This should also include updates to KRaftClusterTestKit to take advantage to 
> the controller bootstrap servers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1606088555


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -65,219 +74,479 @@
 @ExtendWith(value = ClusterTestExtensions.class)
 @Tag("integration")
 public class ConfigCommandIntegrationTest {
-AdminZkClient adminZkClient;
-List alterOpts;
 
+private List alterOpts;
+private final String defaultBrokerId = "0";
 private final ClusterInstance cluster;
 
+private static Runnable run(Stream command) {
+return () -> {
+try {
+ConfigCommand.main(command.toArray(String[]::new));
+} catch (RuntimeException e) {
+// do nothing.
+} finally {
+Exit.resetExitProcedure();
+}
+};
+}
+
 public ConfigCommandIntegrationTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
-@ClusterTest(types = {Type.ZK, Type.KRAFT})
+@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
 public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
 assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
 "--entity-name", cluster.isKRaftTest() ? "0" : "1",
 "--entity-type", "brokers",
 "--alter",
 "--add-config", "security.inter.broker.protocol=PLAINTEXT")),
-errOut ->
-assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
+errOut -> assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
 }
 
-
 @ClusterTest(types = {Type.ZK})
 public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
 assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
 "--entity-type", "users",
 "--entity-name", "admin",
 "--alter", "--add-config", "consumer_byte_rate=2")),
-errOut ->
-assertTrue(errOut.contains("User configuration updates using 
ZooKeeper are only supported for SCRAM credential updates."), errOut));
-}
-
-public static void assertNonZeroStatusExit(Stream args, 
Consumer checkErrOut) {
-AtomicReference exitStatus = new AtomicReference<>();
-Exit.setExitProcedure((status, __) -> {
-exitStatus.set(status);
-throw new RuntimeException();
-});
-
-String errOut = captureStandardErr(() -> {
-try {
-ConfigCommand.main(args.toArray(String[]::new));
-} catch (RuntimeException e) {
-// do nothing.
-} finally {
-Exit.resetExitProcedure();
-}
-});
-
-checkErrOut.accept(errOut);
-assertNotNull(exitStatus.get());
-assertEquals(1, exitStatus.get());
-}
-
-private Stream quorumArgs() {
-return cluster.isKRaftTest()
-? Stream.of("--bootstrap-server", cluster.bootstrapServers())
-: Stream.of("--zookeeper", 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect());
-}
-
-public List entityOp(Optional brokerId) {
-return brokerId.map(id -> Arrays.asList("--entity-name", 
id)).orElse(Collections.singletonList("--entity-default"));
-}
-
-public void alterConfigWithZk(KafkaZkClient zkClient, Map 
configs, Optional brokerId) throws Exception {
-alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap());
+errOut -> assertTrue(errOut.contains("User configuration updates 
using ZooKeeper are only supported for SCRAM credential updates."), errOut));
 }
 
-public void alterConfigWithZk(KafkaZkClient zkClient, Map 
configs, Optional brokerId, Map encoderConfigs) {
-String configStr = Stream.of(configs.entrySet(), 
encoderConfigs.entrySet())
-.flatMap(Set::stream)
-.map(e -> e.getKey() + "=" + e.getValue())
-.collect(Collectors.joining(","));
-ConfigCommand.ConfigCommandOptions addOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), 
Arrays.asList("--add-config", configStr)));
-ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
-}
-
-void verifyConfig(KafkaZkClient zkClient, Map configs, 
Optional brokerId) {
-Properties entityConfigs = zkClient.getEntityConfigs("brokers", 
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
-assertEquals(configs, entityConfigs);
-}
-
-void alterAndVerifyConfig(KafkaZkClient zkClient, Map 
configs, Optional brokerId) throws Exception {
-alterConfigWithZk(zkClient, configs, brokerId);
-verifyConfig(zkClient, configs, brokerId);
-}
+@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+public void 

Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1606088402


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -65,219 +74,479 @@
 @ExtendWith(value = ClusterTestExtensions.class)
 @Tag("integration")
 public class ConfigCommandIntegrationTest {
-AdminZkClient adminZkClient;
-List alterOpts;
 
+private List alterOpts;
+private final String defaultBrokerId = "0";
 private final ClusterInstance cluster;
 
+private static Runnable run(Stream command) {
+return () -> {
+try {
+ConfigCommand.main(command.toArray(String[]::new));
+} catch (RuntimeException e) {
+// do nothing.
+} finally {
+Exit.resetExitProcedure();
+}
+};
+}
+
 public ConfigCommandIntegrationTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
-@ClusterTest(types = {Type.ZK, Type.KRAFT})
+@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
 public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
 assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
 "--entity-name", cluster.isKRaftTest() ? "0" : "1",
 "--entity-type", "brokers",
 "--alter",
 "--add-config", "security.inter.broker.protocol=PLAINTEXT")),
-errOut ->
-assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
+errOut -> assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
 }
 
-
 @ClusterTest(types = {Type.ZK})
 public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
 assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
 "--entity-type", "users",
 "--entity-name", "admin",
 "--alter", "--add-config", "consumer_byte_rate=2")),
-errOut ->
-assertTrue(errOut.contains("User configuration updates using 
ZooKeeper are only supported for SCRAM credential updates."), errOut));
-}
-
-public static void assertNonZeroStatusExit(Stream args, 
Consumer checkErrOut) {
-AtomicReference exitStatus = new AtomicReference<>();
-Exit.setExitProcedure((status, __) -> {
-exitStatus.set(status);
-throw new RuntimeException();
-});
-
-String errOut = captureStandardErr(() -> {
-try {
-ConfigCommand.main(args.toArray(String[]::new));
-} catch (RuntimeException e) {
-// do nothing.
-} finally {
-Exit.resetExitProcedure();
-}
-});
-
-checkErrOut.accept(errOut);
-assertNotNull(exitStatus.get());
-assertEquals(1, exitStatus.get());
-}
-
-private Stream quorumArgs() {
-return cluster.isKRaftTest()
-? Stream.of("--bootstrap-server", cluster.bootstrapServers())
-: Stream.of("--zookeeper", 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect());
-}
-
-public List entityOp(Optional brokerId) {
-return brokerId.map(id -> Arrays.asList("--entity-name", 
id)).orElse(Collections.singletonList("--entity-default"));
-}
-
-public void alterConfigWithZk(KafkaZkClient zkClient, Map 
configs, Optional brokerId) throws Exception {
-alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap());
+errOut -> assertTrue(errOut.contains("User configuration updates 
using ZooKeeper are only supported for SCRAM credential updates."), errOut));
 }
 
-public void alterConfigWithZk(KafkaZkClient zkClient, Map 
configs, Optional brokerId, Map encoderConfigs) {
-String configStr = Stream.of(configs.entrySet(), 
encoderConfigs.entrySet())
-.flatMap(Set::stream)
-.map(e -> e.getKey() + "=" + e.getValue())
-.collect(Collectors.joining(","));
-ConfigCommand.ConfigCommandOptions addOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), 
Arrays.asList("--add-config", configStr)));
-ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
-}
-
-void verifyConfig(KafkaZkClient zkClient, Map configs, 
Optional brokerId) {
-Properties entityConfigs = zkClient.getEntityConfigs("brokers", 
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
-assertEquals(configs, entityConfigs);
-}
-
-void alterAndVerifyConfig(KafkaZkClient zkClient, Map 
configs, Optional brokerId) throws Exception {
-alterConfigWithZk(zkClient, configs, brokerId);
-verifyConfig(zkClient, configs, brokerId);
-}
+@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+public void 

Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1606087930


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -65,219 +74,479 @@
 @ExtendWith(value = ClusterTestExtensions.class)
 @Tag("integration")
 public class ConfigCommandIntegrationTest {
-AdminZkClient adminZkClient;
-List alterOpts;
 
+private List alterOpts;
+private final String defaultBrokerId = "0";
 private final ClusterInstance cluster;
 
+private static Runnable run(Stream command) {
+return () -> {
+try {
+ConfigCommand.main(command.toArray(String[]::new));
+} catch (RuntimeException e) {
+// do nothing.
+} finally {
+Exit.resetExitProcedure();
+}
+};
+}
+
 public ConfigCommandIntegrationTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
-@ClusterTest(types = {Type.ZK, Type.KRAFT})
+@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
 public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
 assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
 "--entity-name", cluster.isKRaftTest() ? "0" : "1",
 "--entity-type", "brokers",
 "--alter",
 "--add-config", "security.inter.broker.protocol=PLAINTEXT")),
-errOut ->
-assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
+errOut -> assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
 }
 
-
 @ClusterTest(types = {Type.ZK})
 public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
 assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
 "--entity-type", "users",
 "--entity-name", "admin",
 "--alter", "--add-config", "consumer_byte_rate=2")),
-errOut ->
-assertTrue(errOut.contains("User configuration updates using 
ZooKeeper are only supported for SCRAM credential updates."), errOut));
-}
-
-public static void assertNonZeroStatusExit(Stream args, 
Consumer checkErrOut) {
-AtomicReference exitStatus = new AtomicReference<>();
-Exit.setExitProcedure((status, __) -> {
-exitStatus.set(status);
-throw new RuntimeException();
-});
-
-String errOut = captureStandardErr(() -> {
-try {
-ConfigCommand.main(args.toArray(String[]::new));
-} catch (RuntimeException e) {
-// do nothing.
-} finally {
-Exit.resetExitProcedure();
-}
-});
-
-checkErrOut.accept(errOut);
-assertNotNull(exitStatus.get());
-assertEquals(1, exitStatus.get());
-}
-
-private Stream quorumArgs() {
-return cluster.isKRaftTest()
-? Stream.of("--bootstrap-server", cluster.bootstrapServers())
-: Stream.of("--zookeeper", 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect());
-}
-
-public List entityOp(Optional brokerId) {
-return brokerId.map(id -> Arrays.asList("--entity-name", 
id)).orElse(Collections.singletonList("--entity-default"));
-}
-
-public void alterConfigWithZk(KafkaZkClient zkClient, Map 
configs, Optional brokerId) throws Exception {
-alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap());
+errOut -> assertTrue(errOut.contains("User configuration updates 
using ZooKeeper are only supported for SCRAM credential updates."), errOut));
 }
 
-public void alterConfigWithZk(KafkaZkClient zkClient, Map 
configs, Optional brokerId, Map encoderConfigs) {
-String configStr = Stream.of(configs.entrySet(), 
encoderConfigs.entrySet())
-.flatMap(Set::stream)
-.map(e -> e.getKey() + "=" + e.getValue())
-.collect(Collectors.joining(","));
-ConfigCommand.ConfigCommandOptions addOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), 
Arrays.asList("--add-config", configStr)));
-ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
-}
-
-void verifyConfig(KafkaZkClient zkClient, Map configs, 
Optional brokerId) {
-Properties entityConfigs = zkClient.getEntityConfigs("brokers", 
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
-assertEquals(configs, entityConfigs);
-}
-
-void alterAndVerifyConfig(KafkaZkClient zkClient, Map 
configs, Optional brokerId) throws Exception {
-alterConfigWithZk(zkClient, configs, brokerId);
-verifyConfig(zkClient, configs, brokerId);
-}
+@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+public void 

Re: [PR] Kafka-15444: Native docker image for Apache Kafka(KIP-974) [kafka]

2024-05-19 Thread via GitHub


kagarwal06 commented on PR #15927:
URL: https://github.com/apache/kafka/pull/15927#issuecomment-2119331957

   > Also will the current Github workflows work with current PR changes ?
   
   The github workflows uses the build, test and release scripts.
   I have integrated the native docker image with these scripts and tested them 
locally. They are working fine. 
   I will raise a separate PR to integrate this image in the github action 
workflows.
   
   >Can we add README to `docker/native` and add few details about the image 
and native-image-configs
   
   Sure. I will add this Readme in the documentation PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add a constructor to accept cause in ConfigException [kafka]

2024-05-19 Thread via GitHub


gaurav-narula opened a new pull request, #15994:
URL: https://github.com/apache/kafka/pull/15994

   ConfigException's constructor doesn't follow the usual convention of 
accepting a `Throwable` as a cause. Instead, the constructor accepts an Object 
type for the config's value and crafts a message using it.
   
   This makes it difficult to debug cases where the cause would prove useful. 
As an example, consider the flakey test in 
`DynamicBrokerReconfigurationTest::testTrustStoreAlter` [0] which masks the 
cause of the certificate validation failure.
   
   Since the existing constructor is public, this change introduces another 
constructor which accepts a `Throwable` as the second argument. JLS §15.12.2.5 
[1] ensures the more specific constructor is chosen. Caution must be taken when 
passing `null` as the value of the second argument as the constructor with 
`Throwable` as the second argument would be invoked since it's more specific 
[2].
   
   [[0]]: 
https://ge.apache.org/s/bhdpknkevz37g/tests/task/:core:test/details/kafka.server.DynamicBrokerReconfigurationTest/testTrustStoreAlter(String)%5B2%5D?expanded-stacktrace=WyIwIl0=1
   [[1]]: 
https://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.12.2.5
   [[2]]: https://stackoverflow.com/a/1572499
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1606053454


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +86,764 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {
+ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+}
+
+private String[] basicArgs(ClusterInstance cluster) {
 return new String[]{"--reset-offsets",
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", cluster.bootstrapServers(),
 "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
 }
 
-private String[] buildArgsForGroups(List groups, String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForGroups(ClusterInstance cluster, List 
groups, String... args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 for (String group : groups) {
 res.add("--group");
 res.add(group);
 }
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-private String[] buildArgsForGroup(String group, String...args) {
-return buildArgsForGroups(Collections.singletonList(group), args);
+private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+return buildArgsForGroups(cluster, singletonList(group), args);
 }
 
-private String[] buildArgsForAllGroups(String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 res.add("--all-groups");
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-@Test
-public void testResetOffsetsNotExistingGroup() throws Exception {
+@ClusterTemplate("generator")
+public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+String topic = generateRandomTopic();
 String group = "missing.group";
-String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-// Make sure we got a coordinator
-TestUtils.waitForCondition(
-() -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-"Can't find a coordinator");
-Map resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-assertTrue(resetOffsets.isEmpty());
-assertTrue(committedOffsets(TOPIC, group).isEmpty());
-}
-
-@Test
-public void testResetOffsetsExistingTopic() {
-String group = "new.group";
-String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-produceMessages(TOPIC, 100);
-resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
+String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+// Make sure we got a coordinator
+TestUtils.waitForCondition(
+() -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+"Can't find a coordinator");
+Map resetOffsets = 
service.resetOffsets().get(group);
+assertTrue(resetOffsets.isEmpty());
+assertTrue(committedOffsets(cluster, topic, group).isEmpty());
+}
 }
 
-@Test
-public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-produceMessages(TOPIC, 100);
-List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-for (String group : groups) {
-ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-awaitConsumerProgress(TOPIC, group, 

Re: [PR] KAFKA-16784: Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to use ClusterTestExtensions [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15992:
URL: https://github.com/apache/kafka/pull/15992#discussion_r1606051053


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##
@@ -49,66 +51,34 @@
 import static org.mockito.Mockito.verify;
 
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters

Review Comment:
   Do we need this annotation?



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##
@@ -49,66 +51,34 @@
 import static org.mockito.Mockito.verify;
 
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+@ClusterTestDefaults(brokers = 3)
 public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+private final ClusterInstance clusterInstance;
 
 private static final int SEG_SIZE = 1024 * 1024;
 
 private final Time time = new MockTime(1);
-private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
 
-private TopicBasedRemoteLogMetadataManager rlmm() {
-return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
 }
 
-@BeforeEach
-public void setup() {
-// Start the cluster only.
-remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
-}
-
-@AfterEach
-public void teardown() throws IOException {
-remoteLogMetadataManagerHarness.close();
-}
-
-@Test
+@ClusterTest
 public void testMultiplePartitionSubscriptions() throws Exception {
 // Create topics.
 String leaderTopic = "leader";
-HashMap> assignedLeaderTopicReplicas = new 
HashMap<>();
-List leaderTopicReplicas = new ArrayList<>();
 // Set broker id 0 as the first entry which is taken as the leader.
-leaderTopicReplicas.add(0);
-leaderTopicReplicas.add(1);
-leaderTopicReplicas.add(2);
-assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
-remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
-JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
-remoteLogMetadataManagerHarness.listenerName());
+createTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 
1, 2)));
 
 String followerTopic = "follower";
-HashMap> assignedFollowerTopicReplicas = new 
HashMap<>();
-List followerTopicReplicas = new ArrayList<>();
 // Set broker id 1 as the first entry which is taken as the leader.
-followerTopicReplicas.add(1);
-followerTopicReplicas.add(2);
-followerTopicReplicas.add(0);
-assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
-remoteLogMetadataManagerHarness.createTopicWithAssignment(
-followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
-remoteLogMetadataManagerHarness.listenerName());
+createTopic(followerTopic, Collections.singletonMap(0, 
Arrays.asList(1, 2, 0)));
 
 String topicWithNoMessages = "no-messages-topic";
-HashMap> assignedTopicReplicas = new HashMap<>();
-List noMessagesTopicReplicas = new ArrayList<>();
 // Set broker id 1 as the first entry which is taken as the leader.
-noMessagesTopicReplicas.add(1);
-noMessagesTopicReplicas.add(2);
-noMessagesTopicReplicas.add(0);
-assignedTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
-remoteLogMetadataManagerHarness.createTopicWithAssignment(
-topicWithNoMessages, 
JavaConverters.mapAsScalaMap(assignedTopicReplicas),
-remoteLogMetadataManagerHarness.listenerName());
+createTopic(topicWithNoMessages, Collections.singletonMap(0, 
Arrays.asList(1, 2, 0)));

Review Comment:
   Could you please call `ClusterInstance#waitForTopic` to make sure all topics 
get created?
   
   
https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/ClusterInstance.java#L185



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1606049924


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -126,29 +126,34 @@ class LogCleaner(initialConfig: CleanerConfig,
   private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
 cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => 
math.max(max, f(thread))).toInt
 
-  /* a metric to track the maximum utilization of any thread's buffer in the 
last cleaning */
-  metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
-() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
-
-  /* a metric to track the recopy rate of each thread's last cleaning */
-  metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
-val stats = cleaners.map(_.lastStats)
-val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / 
math.max(stats.iterator.map(_.bytesRead).sum, 1)
-(100 * recopyRate).toInt
-  })
-
-  /* a metric to track the maximum cleaning time for the last cleaning from 
each thread */
-  metricsGroup.newGauge(MaxCleanTimeMetricName, () => 
maxOverCleanerThreads(_.lastStats.elapsedSecs))
-
-  // a metric to track delay between the time when a log is required to be 
compacted
-  // as determined by max compaction lag and the time of last cleaner run.
-  metricsGroup.newGauge(MaxCompactionDelayMetricsName,
-() => 
maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
-
-  metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
-
   private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
 
+  /**
+   * Activate metrics
+   */
+  private def activateMetrics(): Unit = {

Review Comment:
   We don't need this function now, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16654:Refactor kafka.test.annotation.Type and ClusterTestExtensions [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on PR #15916:
URL: https://github.com/apache/kafka/pull/15916#issuecomment-2119259591

   @TaiJuWu Please update all tests using `ClusterTemplate`. For example: 
zkClustersForAllMigrationVersions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on PR #15946:
URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119258395

   ```
   [2024-05-17T02:48:07.827Z] [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15946/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:238:45:
 ',' is not followed by whitespace. [WhitespaceAfter]
   
   ```
   
   Could you please fix the build error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14579) Move DumpLogSegments to tools

2024-05-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847685#comment-17847685
 ] 

Chia-Ping Tsai edited comment on KAFKA-14579 at 5/19/24 2:32 PM:
-

I create https://issues.apache.org/jira/browse/KAFKA-16796 to address that. It 
would be great to complete it before 3.8


was (Author: chia7712):
I create https://issues.apache.org/jira/browse/KAFKA-14579 to address that. It 
would be great to complete it before 3.8

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14579) Move DumpLogSegments to tools

2024-05-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847685#comment-17847685
 ] 

Chia-Ping Tsai commented on KAFKA-14579:


I create https://issues.apache.org/jira/browse/KAFKA-14579 to address that. It 
would be great to complete it before 3.8

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16796) Introduce new org.apache.kafka.tools.api.MessageParser to replace kafka.tools.DumpLogSegments.MessageParser

2024-05-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16796:
---
Labels: need-kip  (was: )

> Introduce new org.apache.kafka.tools.api.MessageParser to replace 
> kafka.tools.DumpLogSegments.MessageParser
> ---
>
> Key: KAFKA-16796
> URL: https://issues.apache.org/jira/browse/KAFKA-16796
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
>
> We need a replacement in order to complete 
> https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16796) Introduce new org.apache.kafka.tools.api.MessageParser to replace kafka.tools.DumpLogSegments.MessageParser

2024-05-19 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16796:
--

 Summary: Introduce new org.apache.kafka.tools.api.MessageParser to 
replace kafka.tools.DumpLogSegments.MessageParser
 Key: KAFKA-16796
 URL: https://issues.apache.org/jira/browse/KAFKA-16796
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We need a replacement in order to complete 
https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-05-19 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847684#comment-17847684
 ] 

Haruki Okada commented on KAFKA-16541:
--

[~junrao] Hi, i've just submitted a patch. PTAL

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-05-19 Thread via GitHub


ocadaruma commented on PR #15993:
URL: https://github.com/apache/kafka/pull/15993#issuecomment-2119256710

   @junrao Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on PR #15830:
URL: https://github.com/apache/kafka/pull/15830#issuecomment-2119256124

   @pasharik Could you please check  the failed test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-05-19 Thread via GitHub


ocadaruma opened a new pull request, #15993:
URL: https://github.com/apache/kafka/pull/15993

   - [A patch for KAFKA-15046](https://github.com/apache/kafka/pull/14242) got 
rid of `fsync` on `LeaderEpochFileCache#truncateFromStart/End` for performance 
reason, but it turned out this could cause corrupted leader-epoch checkpoint 
file on ungraceful OS shutdown
 * i.e. OS shuts down in the middle when kernel is writing dirty pages back 
to the device
   - To address this problem, this PR makes below changes:
 * Revert `LeaderEpochCheckpoint#write` to always fsync
 * `truncateFromStart/End` now call `LeaderEpochCheckpoint#write` 
asynchronously on scheduler thread
 - Why we still need to call `LeaderEpochCheckpoint#write`? => To 
prevent checkpoint file grows indefinitely as leader epoch increments
 - Why async? => To avoid causing performance issue which #14242 
addressed
 * `UnifiedLog#maybeCreateLeaderEpochCache` now loads epoch entries from 
checkpoint file only when current cache is absent
 - This is to prevent potentially-stale (because of async checkpointing 
mentioned above) checkpoint file is read and causes epoch entries to become 
incorrect
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2024-05-19 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847681#comment-17847681
 ] 

Ivan Yurchenko commented on KAFKA-8206:
---

The KIP was accepted and now I'm working on the patch.

> A consumer can't discover new group coordinator when the cluster was partly 
> restarted
> -
>
> Key: KAFKA-8206
> URL: https://issues.apache.org/jira/browse/KAFKA-8206
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, 
> 3.3.1
>Reporter: alex gabriel
>Assignee: Ivan Yurchenko
>Priority: Critical
>  Labels: needs-kip
>
> *A consumer can't discover new group coordinator when the cluster was partly 
> restarted*
> Preconditions:
> I use Kafka server and Java kafka-client lib 2.2 version
> I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
> ZK(localhost:2181)
> I have replication factor 2 for the all my topics and 
> '_unclean.leader.election.enable=true_' on both Kafka nodes.
> Steps to reproduce:
> 1) Start 2nodes (localhost:9092/localhost:9093)
> 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
> {noformat}
> // discovered group coordinator (0-node)
> 2019-04-09 16:23:18,963 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
> ...metadatacache is updated (2 nodes in the cluster list)
> 2019-04-09 16:23:18,928 DEBUG 
> [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
> [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
> metadata request (type=MetadataRequest, topics=) to node localhost:9092 
> (id: -1 rack: null)>
> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
> localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
> localhost:9092 (id: 0 rack: null))}>
> {noformat}
> 3) Shutdown 1-node (localhost:9093)
> {noformat}
> // metadata was updated to the 4 version (but for some reasons it still had 2 
> alive nodes inside cluster)
> 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
> localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
> events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
> offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader 
> = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
> localhost:9092 (id: 0 rack: null))}>
> //consumers thinks that node-1 is still alive and try to send coordinator 
> lookup to it but failed
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
> coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
> invalid, will attempt rediscovery>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
> 2019-04-09 16:24:01,117 WARN 
> [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
> (localhost:9093) could not be established. Broker may not be available.>
> // refreshing metadata again
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, 
> apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 
> being disconnected>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> 

Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15966:
URL: https://github.com/apache/kafka/pull/15966#discussion_r1605998588


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -99,28 +99,24 @@ public Errors error() {
 }
 
 public LinkedHashMap 
responseData(Map topicNames, short version) {
-if (responseData == null) {
-synchronized (this) {
-if (responseData == null) {
-// Assigning the lazy-initialized `responseData` in the 
last step
-// to avoid other threads accessing a half-initialized 
object.
-final LinkedHashMap responseDataTmp =
-new LinkedHashMap<>();
-data.responses().forEach(topicResponse -> {
-String name;
-if (version < 13) {
-name = topicResponse.topic();
-} else {
-name = topicNames.get(topicResponse.topicId());
-}
-if (name != null) {
-topicResponse.partitions().forEach(partition ->
-responseDataTmp.put(new TopicPartition(name, 
partition.partitionIndex()), partition));
-}
-});
-responseData = responseDataTmp;
+synchronized (this) {
+// Assigning the lazy-initialized `responseData` in the last step
+// to avoid other threads accessing a half-initialized object.
+final LinkedHashMap responseDataTmp =
+new LinkedHashMap<>();
+data.responses().forEach(topicResponse -> {
+String name;
+if (version < 13) {
+name = topicResponse.topic();
+} else {
+name = topicNames.get(topicResponse.topicId());
 }
-}
+if (name != null) {
+topicResponse.partitions().forEach(partition ->
+responseDataTmp.put(new TopicPartition(name, 
partition.partitionIndex()), partition));
+}
+});
+responseData = responseDataTmp;

Review Comment:
   This is what I do concern! The `responseData` could be different if the 
input gets changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on PR #15989:
URL: https://github.com/apache/kafka/pull/15989#issuecomment-2119182780

   @chiacyu Could you please fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16789: Fix thread leak detection for event handler threads [kafka]

2024-05-19 Thread via GitHub


chia7712 merged PR #15984:
URL: https://github.com/apache/kafka/pull/15984


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16795) Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter

2024-05-19 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng updated KAFKA-16795:
--
Parent: KAFKA-14525
Issue Type: Sub-task  (was: Bug)

> Fix broken compatibility in kafka.tools.NoOpMessageFormatter, 
> kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter
> --
>
> Key: KAFKA-16795
> URL: https://issues.apache.org/jira/browse/KAFKA-16795
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0
>
>
> [{{0bf830f}}|https://github.com/apache/kafka/commit/0bf830fc9c3915bc99b6e487e6083dabd593c5d3]
>  moved NoOpMessageFormatter, DefaultMessageFormatter and 
> LoggingMessageFormatter package from {{kafka.tools}} to 
> {{{}org.apache.kafka.tools.consumer{}}}{{{}{}}}
> These classes could be used via cmd kafka-console-consumer.sh. We should have 
> a dependency cycle before 3.8.0 comes out.
>  
> {code:java}
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic streams-wordcount-output \
> --from-beginning \
> --formatter kafka.tools.DefaultMessageFormatter \
> --property print.key=true \
> --property print.value=true \
> --property 
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property 
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer{code}
> The goal in this Jira is to allow user to keep using 
> {{{}kafka.tools.NoOpMessageFormatter{}}}, 
> {{{}kafka.tools.DefaultMessageFormatter{}}}, and 
> {{{}kafka.tools.LoggingMessageFormatter{}}}, but we also display warning 
> messages to say those "strings" will be removed in 4.0.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix incorrect formatter package in streams quickstart [kafka]

2024-05-19 Thread via GitHub


brandboat commented on code in PR #15991:
URL: https://github.com/apache/kafka/pull/15991#discussion_r1605980737


##
docs/streams/quickstart.html:
##
@@ -200,7 +200,7 @@ Step 4: St
  
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
 --topic streams-wordcount-output \
 --from-beginning \
---formatter kafka.tools.DefaultMessageFormatter \
+--formatter org.apache.kafka.tools.consumer.DefaultMessageFormatter \

Review Comment:
   Filed JIRA https://issues.apache.org/jira/browse/KAFKA-16795. Will file 
another PR soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16795) Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter

2024-05-19 Thread Kuan Po Tseng (Jira)
Kuan Po Tseng created KAFKA-16795:
-

 Summary: Fix broken compatibility in 
kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and 
kafka.tools.LoggingMessageFormatter
 Key: KAFKA-16795
 URL: https://issues.apache.org/jira/browse/KAFKA-16795
 Project: Kafka
  Issue Type: Bug
Reporter: Kuan Po Tseng
Assignee: Kuan Po Tseng
 Fix For: 3.8.0


[{{0bf830f}}|https://github.com/apache/kafka/commit/0bf830fc9c3915bc99b6e487e6083dabd593c5d3]
 moved NoOpMessageFormatter, DefaultMessageFormatter and 
LoggingMessageFormatter package from {{kafka.tools}} to 
{{{}org.apache.kafka.tools.consumer{}}}{{{}{}}}

These classes could be used via cmd kafka-console-consumer.sh. We should have a 
dependency cycle before 3.8.0 comes out.

 
{code:java}
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer{code}
The goal in this Jira is to allow user to keep using 
{{{}kafka.tools.NoOpMessageFormatter{}}}, 
{{{}kafka.tools.DefaultMessageFormatter{}}}, and 
{{{}kafka.tools.LoggingMessageFormatter{}}}, but we also display warning 
messages to say those "strings" will be removed in 4.0.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16794) Can't open videos in streams documentation

2024-05-19 Thread Kuan Po Tseng (Jira)
Kuan Po Tseng created KAFKA-16794:
-

 Summary: Can't open videos in streams documentation
 Key: KAFKA-16794
 URL: https://issues.apache.org/jira/browse/KAFKA-16794
 Project: Kafka
  Issue Type: Bug
  Components: docs
Reporter: Kuan Po Tseng
 Attachments: IMG_4445.png, image.png

Can't open videos in page [https://kafka.apache.org/documentation/streams/]

Open console in chrome browser and it shows error message:

{{Refused to frame 'https://www.youtube.com/' because it violates the following 
Content Security Policy directive: "frame-src 'self'".}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix incorrect formatter package in streams quickstart [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15991:
URL: https://github.com/apache/kafka/pull/15991#discussion_r1605966784


##
docs/streams/quickstart.html:
##
@@ -200,7 +200,7 @@ Step 4: St
  
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
 --topic streams-wordcount-output \
 --from-beginning \
---formatter kafka.tools.DefaultMessageFormatter \
+--formatter org.apache.kafka.tools.consumer.DefaultMessageFormatter \

Review Comment:
   @brandboat Could you please file another PR to fix broken compatibility? Let 
this PR focus on docs updates :smile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-05-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847628#comment-17847628
 ] 

Luke Chen commented on KAFKA-16414:
---

So, I think we have the consensus that if we can include active segment for 
retention.bytes as well as making tiered storage integration tests non-flaky, 
then it is good to make this change. But from [~ckamal] 's opinion, it's not 
easy to achieve that. Maybe we can give it a quick try and see if the time 
investment is worth or not. That is, the current behavior has been there for a 
long time, I think even if we don't change it, users seem to accept it. So if 
you need much time to make the tiered storage integration test reliable, it 
might not worth doing it. WDYT [~brandboat] ? Tha

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)