Re: [PR] KAFKA-15739: KRaft support in ResetConsumerGroupOffsetTest [kafka]
linzihao1999 closed pull request #14686: KAFKA-15739: KRaft support in ResetConsumerGroupOffsetTest URL: https://github.com/apache/kafka/pull/14686 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16708) Allow dynamic port for advertised listeners in Docker image
[ https://issues.apache.org/jira/browse/KAFKA-16708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847767#comment-17847767 ] Chris Bono commented on KAFKA-16708: Hi [~vedarth] {quote}Thanks a lot for raising the ticket and sharing WIP commit as well. I am really happy to see your interest in improving the apache kafka docker image. {quote} You are more than welcome. {quote}I am interested in understanding the benefit of having dynamic port. One clear benefit is that static mapping isn't needed. But are there other benefits? {quote} We have a large set of smoke tests that each use docker-compose and when using static port mappings we get port collisions from time to time. Using dynamic port mappings alleviates this issue. {quote} Given that docker in docker has security concerns, I think it's important to make this exclusive for testing purposes. Since apache/kafka docker image is meant for production usage, I think it might be better to consider adding this to just KIP-974 apache/kafka-native image (to be released in 3.8.0) as it's meant for testing purposes and local usage. This will also remove the risk of users enabling this by accident in production. {quote} Nice! I was unaware of KIP-974; this "kafka-native" (non-production use) image does sound like an excellent delivery vehicle w/o the risks of running this in PROD. > Allow dynamic port for advertised listeners in Docker image > --- > > Key: KAFKA-16708 > URL: https://issues.apache.org/jira/browse/KAFKA-16708 > Project: Kafka > Issue Type: Improvement >Reporter: Chris Bono >Priority: Major > > First of all, thank you all for adding the official Kafka Docker image (I > know it is a big responsibility and adds to the team workload). > I am migrating from {{wurstmeister/kafka}} to the official {{apache/kafka}} > image. > My advertised port is not static and was relying on [the PORT_COMMAND > feature|https://github.com/wurstmeister/kafka-docker/commit/c66375fc3b94e98dbecd603c5d2b44c06e927e88] > in the {{wurstmeister/kafka}} image to determine the port programatically. > This would let me define a docker-compose as follows: > {code:java} > services: > kafka: > image: apache/kafka:latest > hostname: kafka > ports: > - "9092" > volumes: > - '/var/run/docker.sock.raw:/var/run/docker.sock' > environment: > KAFKA_NODE_ID: 1 > KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: > 'CONTROLLER:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' > KAFKA_LISTENERS: > 'CONTROLLER://kafka:29093,PLAINTEXT_DOCKER://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092' > KAFKA_ADVERTISED_LISTENERS: > 'PLAINTEXT_DOCKER://kafka:29092,PLAINTEXT_HOST://localhost:_{PORT_COMMAND}' > KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 > KAFKA_PROCESS_ROLES: 'broker,controller' > KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' > KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT_DOCKER' > KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' > PORT_COMMAND: "docker ps | egrep 'kafka' | cut -d: -f 3 | cut -d- -f > 1"{code} > Notice how the "ports" are dynamically mapped (i.e. not *"port:port"* syntax) > - the actual port will *not* be "9092". > Do you have a suggestion for an alternative approach on how to obtain a > non-static port for advertised listeners? If not, would adding conditional > support for this feature be welcomed? > I am aware of the complication/concern of this request as it is Docker in > Docker for non-root users (described > [here|https://jonfriesen.ca/articles/docker-in-docker-non-root-user/]) and as > such we could make it inactive by default and users would have to opt-in > explicitly. > I have created a [rough > WIP|https://github.com/onobc/kafka/commit/6556c4adbf08155b89c9804c2c5d1a988f8371f2] > that illustrates the concept (there is no conditionality in it currently). > Note that the container is not run as {*}root{*}, but rather the *appuser* is > added to whatever group that own the docker.sock (which on my machine is > root). > > P.S. > * This is my first time filing an issue w/ Kafka so if I missed anything > please let me know and I am glad to add whatever other info, etc.. > * I am not sure what "Component" this should be under (the other Kafka > Docker related issues had differing values here) > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
gongxuanzhang commented on PR #15946: URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119568301 > Could you please fix the build error? fixed it, i'm so sorry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
gongxuanzhang commented on PR #15946: URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119506863 > Could you please fix the build error? I can,I will submit later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16796) Introduce new org.apache.kafka.tools.api.MessageParser to replace kafka.tools.DumpLogSegments.MessageParser
[ https://issues.apache.org/jira/browse/KAFKA-16796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847744#comment-17847744 ] PoAn Yang commented on KAFKA-16796: --- Hi [~chia7712], I'm interested in this. May I take it? Thank you. > Introduce new org.apache.kafka.tools.api.MessageParser to replace > kafka.tools.DumpLogSegments.MessageParser > --- > > Key: KAFKA-16796 > URL: https://issues.apache.org/jira/browse/KAFKA-16796 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > > We need a replacement in order to complete > https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16797: A bit cleanup of FeatureControlManager [kafka]
m1a2st opened a new pull request, #15997: URL: https://github.com/apache/kafka/pull/15997 Clean up `brokerSupported` and `controllerSupported` method by using `Collections.emptyIterator()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: ensure KafkaServerTestHarness::tearDown is always invoked [kafka]
gaurav-narula opened a new pull request, #15996: URL: https://github.com/apache/kafka/pull/15996 An exception thrown while closing the client instances in `IntegrationTestHarness::tearDown` may result in `KafkaServerTestHarness::tearDown` not being invoked. This would result in thread leaks of the broker and controller threads spawned in the failing test. An example of this is the [CI run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15994/1/tests) for #15994 where `Build / JDK 8 and Scala 2.12 / testCoordinatorFailover(String, String).quorum=kraft+kip848.groupProtocol=consumer – kafka.api.PlaintextConsumerTest` failing results in `consumers.foreach(_.close(Duration.ZERO))` in `IntegrationTestHarness::tearDown` throwing an exception. A side effect of this is it poisons Gradle test runner JVM and prevents tests in other unrelated classes from executing as `@BeforeAll` check in QuorumTestHarness would cause them to fail immediately. This PR encloses the client closure in try-finally to ensure `KafkaServerTestHarness::tearDown` is always invoked. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16469: Metadata schema checker [kafka]
cmccabe opened a new pull request, #15995: URL: https://github.com/apache/kafka/pull/15995 Create a schema checker that can validate that later versions of a KRPC schema are compatible with earlier ones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16469) Metadata Schema Checker
[ https://issues.apache.org/jira/browse/KAFKA-16469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-16469: Assignee: Colin McCabe > Metadata Schema Checker > --- > > Key: KAFKA-16469 > URL: https://issues.apache.org/jira/browse/KAFKA-16469 > Project: Kafka > Issue Type: New Feature >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16529) Response handling and request sending for voters RPCs
[ https://issues.apache.org/jira/browse/KAFKA-16529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16529: --- Description: Implement response handling and request sending for the following RPCs: # Vote # BeginQuorumEpoch # Fetch When loading the leader from the quorum-state file don't assume that the leader is in the voter set. was: Implement response handling and request sending for the following RPCs: # Vote # BeginQuorumEpoch # Fetch > Response handling and request sending for voters RPCs > - > > Key: KAFKA-16529 > URL: https://issues.apache.org/jira/browse/KAFKA-16529 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.8.0 > > > Implement response handling and request sending for the following RPCs: > # Vote > # BeginQuorumEpoch > # Fetch > When loading the leader from the quorum-state file don't assume that the > leader is in the voter set. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16797) A bit cleanup of FeatureControlManager
Chia-Ping Tsai created KAFKA-16797: -- Summary: A bit cleanup of FeatureControlManager Key: KAFKA-16797 URL: https://issues.apache.org/jira/browse/KAFKA-16797 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java#L62] that can be replaced by `Collections.emptyIterator()` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16653) Remove delayed initialization because of static voter set
[ https://issues.apache.org/jira/browse/KAFKA-16653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16653: --- Description: Once KRaft supports the AddVoter RPC, the QuorumTestHarness and KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the static voter set. This should allow us to remove KRaft's support for delay static voter set initialization. This should also include updates to KRaftClusterTestKit to take advantage to the controller bootstrap servers. was: Once KRaft supports the AddVoter RPC, the QuorumTestHarness and KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the static voter set. This should allow us to remove KRaft's support for delay static voter set initialization. > Remove delayed initialization because of static voter set > - > > Key: KAFKA-16653 > URL: https://issues.apache.org/jira/browse/KAFKA-16653 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: José Armando García Sancio >Priority: Major > > Once KRaft supports the AddVoter RPC, the QuorumTestHarness and > KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the > static voter set. > This should allow us to remove KRaft's support for delay static voter set > initialization. > This should also include updates to KRaftClusterTestKit to take advantage to > the controller bootstrap servers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
chia7712 commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1606088555 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -65,219 +74,479 @@ @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") public class ConfigCommandIntegrationTest { -AdminZkClient adminZkClient; -List alterOpts; +private List alterOpts; +private final String defaultBrokerId = "0"; private final ClusterInstance cluster; +private static Runnable run(Stream command) { +return () -> { +try { +ConfigCommand.main(command.toArray(String[]::new)); +} catch (RuntimeException e) { +// do nothing. +} finally { +Exit.resetExitProcedure(); +} +}; +} + public ConfigCommandIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } -@ClusterTest(types = {Type.ZK, Type.KRAFT}) +@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}) public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-name", cluster.isKRaftTest() ? "0" : "1", "--entity-type", "brokers", "--alter", "--add-config", "security.inter.broker.protocol=PLAINTEXT")), -errOut -> -assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); +errOut -> assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); } - @ClusterTest(types = {Type.ZK}) public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-type", "users", "--entity-name", "admin", "--alter", "--add-config", "consumer_byte_rate=2")), -errOut -> -assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); -} - -public static void assertNonZeroStatusExit(Stream args, Consumer checkErrOut) { -AtomicReference exitStatus = new AtomicReference<>(); -Exit.setExitProcedure((status, __) -> { -exitStatus.set(status); -throw new RuntimeException(); -}); - -String errOut = captureStandardErr(() -> { -try { -ConfigCommand.main(args.toArray(String[]::new)); -} catch (RuntimeException e) { -// do nothing. -} finally { -Exit.resetExitProcedure(); -} -}); - -checkErrOut.accept(errOut); -assertNotNull(exitStatus.get()); -assertEquals(1, exitStatus.get()); -} - -private Stream quorumArgs() { -return cluster.isKRaftTest() -? Stream.of("--bootstrap-server", cluster.bootstrapServers()) -: Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect()); -} - -public List entityOp(Optional brokerId) { -return brokerId.map(id -> Arrays.asList("--entity-name", id)).orElse(Collections.singletonList("--entity-default")); -} - -public void alterConfigWithZk(KafkaZkClient zkClient, Map configs, Optional brokerId) throws Exception { -alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap()); +errOut -> assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); } -public void alterConfigWithZk(KafkaZkClient zkClient, Map configs, Optional brokerId, Map encoderConfigs) { -String configStr = Stream.of(configs.entrySet(), encoderConfigs.entrySet()) -.flatMap(Set::stream) -.map(e -> e.getKey() + "=" + e.getValue()) -.collect(Collectors.joining(",")); -ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--add-config", configStr))); -ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient); -} - -void verifyConfig(KafkaZkClient zkClient, Map configs, Optional brokerId) { -Properties entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING)); -assertEquals(configs, entityConfigs); -} - -void alterAndVerifyConfig(KafkaZkClient zkClient, Map configs, Optional brokerId) throws Exception { -alterConfigWithZk(zkClient, configs, brokerId); -verifyConfig(zkClient, configs, brokerId); -} +@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) +public void
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
chia7712 commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1606088402 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -65,219 +74,479 @@ @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") public class ConfigCommandIntegrationTest { -AdminZkClient adminZkClient; -List alterOpts; +private List alterOpts; +private final String defaultBrokerId = "0"; private final ClusterInstance cluster; +private static Runnable run(Stream command) { +return () -> { +try { +ConfigCommand.main(command.toArray(String[]::new)); +} catch (RuntimeException e) { +// do nothing. +} finally { +Exit.resetExitProcedure(); +} +}; +} + public ConfigCommandIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } -@ClusterTest(types = {Type.ZK, Type.KRAFT}) +@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}) public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-name", cluster.isKRaftTest() ? "0" : "1", "--entity-type", "brokers", "--alter", "--add-config", "security.inter.broker.protocol=PLAINTEXT")), -errOut -> -assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); +errOut -> assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); } - @ClusterTest(types = {Type.ZK}) public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-type", "users", "--entity-name", "admin", "--alter", "--add-config", "consumer_byte_rate=2")), -errOut -> -assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); -} - -public static void assertNonZeroStatusExit(Stream args, Consumer checkErrOut) { -AtomicReference exitStatus = new AtomicReference<>(); -Exit.setExitProcedure((status, __) -> { -exitStatus.set(status); -throw new RuntimeException(); -}); - -String errOut = captureStandardErr(() -> { -try { -ConfigCommand.main(args.toArray(String[]::new)); -} catch (RuntimeException e) { -// do nothing. -} finally { -Exit.resetExitProcedure(); -} -}); - -checkErrOut.accept(errOut); -assertNotNull(exitStatus.get()); -assertEquals(1, exitStatus.get()); -} - -private Stream quorumArgs() { -return cluster.isKRaftTest() -? Stream.of("--bootstrap-server", cluster.bootstrapServers()) -: Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect()); -} - -public List entityOp(Optional brokerId) { -return brokerId.map(id -> Arrays.asList("--entity-name", id)).orElse(Collections.singletonList("--entity-default")); -} - -public void alterConfigWithZk(KafkaZkClient zkClient, Map configs, Optional brokerId) throws Exception { -alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap()); +errOut -> assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); } -public void alterConfigWithZk(KafkaZkClient zkClient, Map configs, Optional brokerId, Map encoderConfigs) { -String configStr = Stream.of(configs.entrySet(), encoderConfigs.entrySet()) -.flatMap(Set::stream) -.map(e -> e.getKey() + "=" + e.getValue()) -.collect(Collectors.joining(",")); -ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--add-config", configStr))); -ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient); -} - -void verifyConfig(KafkaZkClient zkClient, Map configs, Optional brokerId) { -Properties entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING)); -assertEquals(configs, entityConfigs); -} - -void alterAndVerifyConfig(KafkaZkClient zkClient, Map configs, Optional brokerId) throws Exception { -alterConfigWithZk(zkClient, configs, brokerId); -verifyConfig(zkClient, configs, brokerId); -} +@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) +public void
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
chia7712 commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1606087930 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -65,219 +74,479 @@ @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") public class ConfigCommandIntegrationTest { -AdminZkClient adminZkClient; -List alterOpts; +private List alterOpts; +private final String defaultBrokerId = "0"; private final ClusterInstance cluster; +private static Runnable run(Stream command) { +return () -> { +try { +ConfigCommand.main(command.toArray(String[]::new)); +} catch (RuntimeException e) { +// do nothing. +} finally { +Exit.resetExitProcedure(); +} +}; +} + public ConfigCommandIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } -@ClusterTest(types = {Type.ZK, Type.KRAFT}) +@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}) public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-name", cluster.isKRaftTest() ? "0" : "1", "--entity-type", "brokers", "--alter", "--add-config", "security.inter.broker.protocol=PLAINTEXT")), -errOut -> -assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); +errOut -> assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); } - @ClusterTest(types = {Type.ZK}) public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-type", "users", "--entity-name", "admin", "--alter", "--add-config", "consumer_byte_rate=2")), -errOut -> -assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); -} - -public static void assertNonZeroStatusExit(Stream args, Consumer checkErrOut) { -AtomicReference exitStatus = new AtomicReference<>(); -Exit.setExitProcedure((status, __) -> { -exitStatus.set(status); -throw new RuntimeException(); -}); - -String errOut = captureStandardErr(() -> { -try { -ConfigCommand.main(args.toArray(String[]::new)); -} catch (RuntimeException e) { -// do nothing. -} finally { -Exit.resetExitProcedure(); -} -}); - -checkErrOut.accept(errOut); -assertNotNull(exitStatus.get()); -assertEquals(1, exitStatus.get()); -} - -private Stream quorumArgs() { -return cluster.isKRaftTest() -? Stream.of("--bootstrap-server", cluster.bootstrapServers()) -: Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect()); -} - -public List entityOp(Optional brokerId) { -return brokerId.map(id -> Arrays.asList("--entity-name", id)).orElse(Collections.singletonList("--entity-default")); -} - -public void alterConfigWithZk(KafkaZkClient zkClient, Map configs, Optional brokerId) throws Exception { -alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap()); +errOut -> assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); } -public void alterConfigWithZk(KafkaZkClient zkClient, Map configs, Optional brokerId, Map encoderConfigs) { -String configStr = Stream.of(configs.entrySet(), encoderConfigs.entrySet()) -.flatMap(Set::stream) -.map(e -> e.getKey() + "=" + e.getValue()) -.collect(Collectors.joining(",")); -ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--add-config", configStr))); -ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient); -} - -void verifyConfig(KafkaZkClient zkClient, Map configs, Optional brokerId) { -Properties entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING)); -assertEquals(configs, entityConfigs); -} - -void alterAndVerifyConfig(KafkaZkClient zkClient, Map configs, Optional brokerId) throws Exception { -alterConfigWithZk(zkClient, configs, brokerId); -verifyConfig(zkClient, configs, brokerId); -} +@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) +public void
Re: [PR] Kafka-15444: Native docker image for Apache Kafka(KIP-974) [kafka]
kagarwal06 commented on PR #15927: URL: https://github.com/apache/kafka/pull/15927#issuecomment-2119331957 > Also will the current Github workflows work with current PR changes ? The github workflows uses the build, test and release scripts. I have integrated the native docker image with these scripts and tested them locally. They are working fine. I will raise a separate PR to integrate this image in the github action workflows. >Can we add README to `docker/native` and add few details about the image and native-image-configs Sure. I will add this Readme in the documentation PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add a constructor to accept cause in ConfigException [kafka]
gaurav-narula opened a new pull request, #15994: URL: https://github.com/apache/kafka/pull/15994 ConfigException's constructor doesn't follow the usual convention of accepting a `Throwable` as a cause. Instead, the constructor accepts an Object type for the config's value and crafts a message using it. This makes it difficult to debug cases where the cause would prove useful. As an example, consider the flakey test in `DynamicBrokerReconfigurationTest::testTrustStoreAlter` [0] which masks the cause of the certificate validation failure. Since the existing constructor is public, this change introduces another constructor which accepts a `Throwable` as the second argument. JLS §15.12.2.5 [1] ensures the more specific constructor is chosen. Caution must be taken when passing `null` as the value of the second argument as the constructor with `Throwable` as the second argument would be invoked since it's more specific [2]. [[0]]: https://ge.apache.org/s/bhdpknkevz37g/tests/task/:core:test/details/kafka.server.DynamicBrokerReconfigurationTest/testTrustStoreAlter(String)%5B2%5D?expanded-stacktrace=WyIwIl0=1 [[1]]: https://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.12.2.5 [[2]]: https://stackoverflow.com/a/1572499 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1606053454 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +86,764 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { +ConsumerGroupCommandTestUtils.generator(clusterGenerator); +} + +private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } -private String[] buildArgsForGroups(List groups, String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -private String[] buildArgsForGroup(String group, String...args) { -return buildArgsForGroups(Collections.singletonList(group), args); +private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { +return buildArgsForGroups(cluster, singletonList(group), args); } -private String[] buildArgsForAllGroups(String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -@Test -public void testResetOffsetsNotExistingGroup() throws Exception { +@ClusterTemplate("generator") +public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { +String topic = generateRandomTopic(); String group = "missing.group"; -String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); -// Make sure we got a coordinator -TestUtils.waitForCondition( -() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), -"Can't find a coordinator"); -Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); -assertTrue(resetOffsets.isEmpty()); -assertTrue(committedOffsets(TOPIC, group).isEmpty()); -} - -@Test -public void testResetOffsetsExistingTopic() { -String group = "new.group"; -String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); -produceMessages(TOPIC, 100); -resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); +String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + +try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { +// Make sure we got a coordinator +TestUtils.waitForCondition( +() -> "localhost".equals(service.collectGroupState(group).coordinator.host()), +"Can't find a coordinator"); +Map resetOffsets = service.resetOffsets().get(group); +assertTrue(resetOffsets.isEmpty()); +assertTrue(committedOffsets(cluster, topic, group).isEmpty()); +} } -@Test -public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { -produceMessages(TOPIC, 100); -List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); -for (String group : groups) { -ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); -awaitConsumerProgress(TOPIC, group,
Re: [PR] KAFKA-16784: Migrate TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15992: URL: https://github.com/apache/kafka/pull/15992#discussion_r1606051053 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java: ## @@ -49,66 +51,34 @@ import static org.mockito.Mockito.verify; @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters Review Comment: Do we need this annotation? ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java: ## @@ -49,66 +51,34 @@ import static org.mockito.Mockito.verify; @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +@ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { +private final ClusterInstance clusterInstance; private static final int SEG_SIZE = 1024 * 1024; private final Time time = new MockTime(1); -private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); -private TopicBasedRemoteLogMetadataManager rlmm() { -return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); + TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; } -@BeforeEach -public void setup() { -// Start the cluster only. -remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo()); -} - -@AfterEach -public void teardown() throws IOException { -remoteLogMetadataManagerHarness.close(); -} - -@Test +@ClusterTest public void testMultiplePartitionSubscriptions() throws Exception { // Create topics. String leaderTopic = "leader"; -HashMap> assignedLeaderTopicReplicas = new HashMap<>(); -List leaderTopicReplicas = new ArrayList<>(); // Set broker id 0 as the first entry which is taken as the leader. -leaderTopicReplicas.add(0); -leaderTopicReplicas.add(1); -leaderTopicReplicas.add(2); -assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas)); -remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic, -JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas), -remoteLogMetadataManagerHarness.listenerName()); +createTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2))); String followerTopic = "follower"; -HashMap> assignedFollowerTopicReplicas = new HashMap<>(); -List followerTopicReplicas = new ArrayList<>(); // Set broker id 1 as the first entry which is taken as the leader. -followerTopicReplicas.add(1); -followerTopicReplicas.add(2); -followerTopicReplicas.add(0); -assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas)); -remoteLogMetadataManagerHarness.createTopicWithAssignment( -followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas), -remoteLogMetadataManagerHarness.listenerName()); +createTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0))); String topicWithNoMessages = "no-messages-topic"; -HashMap> assignedTopicReplicas = new HashMap<>(); -List noMessagesTopicReplicas = new ArrayList<>(); // Set broker id 1 as the first entry which is taken as the leader. -noMessagesTopicReplicas.add(1); -noMessagesTopicReplicas.add(2); -noMessagesTopicReplicas.add(0); -assignedTopicReplicas.put(0, JavaConverters.asScalaBuffer(noMessagesTopicReplicas)); -remoteLogMetadataManagerHarness.createTopicWithAssignment( -topicWithNoMessages, JavaConverters.mapAsScalaMap(assignedTopicReplicas), -remoteLogMetadataManagerHarness.listenerName()); +createTopic(topicWithNoMessages, Collections.singletonMap(0, Arrays.asList(1, 2, 0))); Review Comment: Could you please call `ClusterInstance#waitForTopic` to make sure all topics get created? https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/ClusterInstance.java#L185 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chia7712 commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1606049924 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -126,29 +126,34 @@ class LogCleaner(initialConfig: CleanerConfig, private def maxOverCleanerThreads(f: CleanerThread => Double): Int = cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt - /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ - metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, -() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100) - - /* a metric to track the recopy rate of each thread's last cleaning */ - metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => { -val stats = cleaners.map(_.lastStats) -val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1) -(100 * recopyRate).toInt - }) - - /* a metric to track the maximum cleaning time for the last cleaning from each thread */ - metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) - - // a metric to track delay between the time when a log is required to be compacted - // as determined by max compaction lag and the time of last cleaner run. - metricsGroup.newGauge(MaxCompactionDelayMetricsName, -() => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000) - - metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount) - private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed) + /** + * Activate metrics + */ + private def activateMetrics(): Unit = { Review Comment: We don't need this function now, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16654:Refactor kafka.test.annotation.Type and ClusterTestExtensions [kafka]
chia7712 commented on PR #15916: URL: https://github.com/apache/kafka/pull/15916#issuecomment-2119259591 @TaiJuWu Please update all tests using `ClusterTemplate`. For example: zkClustersForAllMigrationVersions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started [kafka]
chia7712 commented on PR #15946: URL: https://github.com/apache/kafka/pull/15946#issuecomment-2119258395 ``` [2024-05-17T02:48:07.827Z] [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-15946/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:238:45: ',' is not followed by whitespace. [WhitespaceAfter] ``` Could you please fix the build error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14579) Move DumpLogSegments to tools
[ https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847685#comment-17847685 ] Chia-Ping Tsai edited comment on KAFKA-14579 at 5/19/24 2:32 PM: - I create https://issues.apache.org/jira/browse/KAFKA-16796 to address that. It would be great to complete it before 3.8 was (Author: chia7712): I create https://issues.apache.org/jira/browse/KAFKA-14579 to address that. It would be great to complete it before 3.8 > Move DumpLogSegments to tools > - > > Key: KAFKA-14579 > URL: https://issues.apache.org/jira/browse/KAFKA-14579 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Alexandre Dupriez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14579) Move DumpLogSegments to tools
[ https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847685#comment-17847685 ] Chia-Ping Tsai commented on KAFKA-14579: I create https://issues.apache.org/jira/browse/KAFKA-14579 to address that. It would be great to complete it before 3.8 > Move DumpLogSegments to tools > - > > Key: KAFKA-14579 > URL: https://issues.apache.org/jira/browse/KAFKA-14579 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Alexandre Dupriez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16796) Introduce new org.apache.kafka.tools.api.MessageParser to replace kafka.tools.DumpLogSegments.MessageParser
[ https://issues.apache.org/jira/browse/KAFKA-16796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16796: --- Labels: need-kip (was: ) > Introduce new org.apache.kafka.tools.api.MessageParser to replace > kafka.tools.DumpLogSegments.MessageParser > --- > > Key: KAFKA-16796 > URL: https://issues.apache.org/jira/browse/KAFKA-16796 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > > We need a replacement in order to complete > https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16796) Introduce new org.apache.kafka.tools.api.MessageParser to replace kafka.tools.DumpLogSegments.MessageParser
Chia-Ping Tsai created KAFKA-16796: -- Summary: Introduce new org.apache.kafka.tools.api.MessageParser to replace kafka.tools.DumpLogSegments.MessageParser Key: KAFKA-16796 URL: https://issues.apache.org/jira/browse/KAFKA-16796 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai We need a replacement in order to complete https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash
[ https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847684#comment-17847684 ] Haruki Okada commented on KAFKA-16541: -- [~junrao] Hi, i've just submitted a patch. PTAL > Potential leader epoch checkpoint file corruption on OS crash > - > > Key: KAFKA-16541 > URL: https://issues.apache.org/jira/browse/KAFKA-16541 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > > Pointed out by [~junrao] on > [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125] > [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid > of fsync of leader-epoch ckeckpoint file in some path for performance reason. > However, since now checkpoint file is flushed to the device asynchronously by > OS, content would corrupt if OS suddenly crashes (e.g. by power failure, > kernel panic) in the middle of flush. > Corrupted checkpoint file could prevent Kafka broker to start-up -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
ocadaruma commented on PR #15993: URL: https://github.com/apache/kafka/pull/15993#issuecomment-2119256710 @junrao Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
chia7712 commented on PR #15830: URL: https://github.com/apache/kafka/pull/15830#issuecomment-2119256124 @pasharik Could you please check the failed test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
ocadaruma opened a new pull request, #15993: URL: https://github.com/apache/kafka/pull/15993 - [A patch for KAFKA-15046](https://github.com/apache/kafka/pull/14242) got rid of `fsync` on `LeaderEpochFileCache#truncateFromStart/End` for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown * i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device - To address this problem, this PR makes below changes: * Revert `LeaderEpochCheckpoint#write` to always fsync * `truncateFromStart/End` now call `LeaderEpochCheckpoint#write` asynchronously on scheduler thread - Why we still need to call `LeaderEpochCheckpoint#write`? => To prevent checkpoint file grows indefinitely as leader epoch increments - Why async? => To avoid causing performance issue which #14242 addressed * `UnifiedLog#maybeCreateLeaderEpochCache` now loads epoch entries from checkpoint file only when current cache is absent - This is to prevent potentially-stale (because of async checkpointing mentioned above) checkpoint file is read and causes epoch entries to become incorrect ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted
[ https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847681#comment-17847681 ] Ivan Yurchenko commented on KAFKA-8206: --- The KIP was accepted and now I'm working on the patch. > A consumer can't discover new group coordinator when the cluster was partly > restarted > - > > Key: KAFKA-8206 > URL: https://issues.apache.org/jira/browse/KAFKA-8206 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, > 3.3.1 >Reporter: alex gabriel >Assignee: Ivan Yurchenko >Priority: Critical > Labels: needs-kip > > *A consumer can't discover new group coordinator when the cluster was partly > restarted* > Preconditions: > I use Kafka server and Java kafka-client lib 2.2 version > I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 > ZK(localhost:2181) > I have replication factor 2 for the all my topics and > '_unclean.leader.election.enable=true_' on both Kafka nodes. > Steps to reproduce: > 1) Start 2nodes (localhost:9092/localhost:9093) > 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093' > {noformat} > // discovered group coordinator (0-node) > 2019-04-09 16:23:18,963 INFO > [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] > - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] > Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)> > ...metadatacache is updated (2 nodes in the cluster list) > 2019-04-09 16:23:18,928 DEBUG > [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - > [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending > metadata request (type=MetadataRequest, topics=) to node localhost:9092 > (id: -1 rack: null)> > 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - > Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = > P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), > localhost:9093 (id: 1 rack: null)], partitions = [], controller = > localhost:9092 (id: 0 rack: null))}> > {noformat} > 3) Shutdown 1-node (localhost:9093) > {noformat} > // metadata was updated to the 4 version (but for some reasons it still had 2 > alive nodes inside cluster) > 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - > Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = > P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), > localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = > events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], > offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader > = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = > localhost:9092 (id: 0 rack: null))}> > //consumers thinks that node-1 is still alive and try to send coordinator > lookup to it but failed > 2019-04-09 16:23:46,981 INFO > [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] > - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] > Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)> > 2019-04-09 16:23:46,981 INFO > [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] > - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group > coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or > invalid, will attempt rediscovery> > 2019-04-09 16:24:01,117 DEBUG > [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer > clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.> > 2019-04-09 16:24:01,117 WARN > [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer > clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 > (localhost:9093) could not be established. Broker may not be available.> > // refreshing metadata again > 2019-04-09 16:24:01,117 DEBUG > [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] > - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] > Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, > apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 > being disconnected> > 2019-04-09 16:24:01,117 DEBUG > [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady] > - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] >
Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]
chia7712 commented on code in PR #15966: URL: https://github.com/apache/kafka/pull/15966#discussion_r1605998588 ## clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java: ## @@ -99,28 +99,24 @@ public Errors error() { } public LinkedHashMap responseData(Map topicNames, short version) { -if (responseData == null) { -synchronized (this) { -if (responseData == null) { -// Assigning the lazy-initialized `responseData` in the last step -// to avoid other threads accessing a half-initialized object. -final LinkedHashMap responseDataTmp = -new LinkedHashMap<>(); -data.responses().forEach(topicResponse -> { -String name; -if (version < 13) { -name = topicResponse.topic(); -} else { -name = topicNames.get(topicResponse.topicId()); -} -if (name != null) { -topicResponse.partitions().forEach(partition -> -responseDataTmp.put(new TopicPartition(name, partition.partitionIndex()), partition)); -} -}); -responseData = responseDataTmp; +synchronized (this) { +// Assigning the lazy-initialized `responseData` in the last step +// to avoid other threads accessing a half-initialized object. +final LinkedHashMap responseDataTmp = +new LinkedHashMap<>(); +data.responses().forEach(topicResponse -> { +String name; +if (version < 13) { +name = topicResponse.topic(); +} else { +name = topicNames.get(topicResponse.topicId()); } -} +if (name != null) { +topicResponse.partitions().forEach(partition -> +responseDataTmp.put(new TopicPartition(name, partition.partitionIndex()), partition)); +} +}); +responseData = responseDataTmp; Review Comment: This is what I do concern! The `responseData` could be different if the input gets changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chia7712 commented on PR #15989: URL: https://github.com/apache/kafka/pull/15989#issuecomment-2119182780 @chiacyu Could you please fix the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16789: Fix thread leak detection for event handler threads [kafka]
chia7712 merged PR #15984: URL: https://github.com/apache/kafka/pull/15984 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16795) Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter
[ https://issues.apache.org/jira/browse/KAFKA-16795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kuan Po Tseng updated KAFKA-16795: -- Parent: KAFKA-14525 Issue Type: Sub-task (was: Bug) > Fix broken compatibility in kafka.tools.NoOpMessageFormatter, > kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter > -- > > Key: KAFKA-16795 > URL: https://issues.apache.org/jira/browse/KAFKA-16795 > Project: Kafka > Issue Type: Sub-task >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > Fix For: 3.8.0 > > > [{{0bf830f}}|https://github.com/apache/kafka/commit/0bf830fc9c3915bc99b6e487e6083dabd593c5d3] > moved NoOpMessageFormatter, DefaultMessageFormatter and > LoggingMessageFormatter package from {{kafka.tools}} to > {{{}org.apache.kafka.tools.consumer{}}}{{{}{}}} > These classes could be used via cmd kafka-console-consumer.sh. We should have > a dependency cycle before 3.8.0 comes out. > > {code:java} > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ > --topic streams-wordcount-output \ > --from-beginning \ > --formatter kafka.tools.DefaultMessageFormatter \ > --property print.key=true \ > --property print.value=true \ > --property > key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ > --property > value.deserializer=org.apache.kafka.common.serialization.LongDeserializer{code} > The goal in this Jira is to allow user to keep using > {{{}kafka.tools.NoOpMessageFormatter{}}}, > {{{}kafka.tools.DefaultMessageFormatter{}}}, and > {{{}kafka.tools.LoggingMessageFormatter{}}}, but we also display warning > messages to say those "strings" will be removed in 4.0. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fix incorrect formatter package in streams quickstart [kafka]
brandboat commented on code in PR #15991: URL: https://github.com/apache/kafka/pull/15991#discussion_r1605980737 ## docs/streams/quickstart.html: ## @@ -200,7 +200,7 @@ Step 4: St bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ ---formatter kafka.tools.DefaultMessageFormatter \ +--formatter org.apache.kafka.tools.consumer.DefaultMessageFormatter \ Review Comment: Filed JIRA https://issues.apache.org/jira/browse/KAFKA-16795. Will file another PR soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16795) Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter
Kuan Po Tseng created KAFKA-16795: - Summary: Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter Key: KAFKA-16795 URL: https://issues.apache.org/jira/browse/KAFKA-16795 Project: Kafka Issue Type: Bug Reporter: Kuan Po Tseng Assignee: Kuan Po Tseng Fix For: 3.8.0 [{{0bf830f}}|https://github.com/apache/kafka/commit/0bf830fc9c3915bc99b6e487e6083dabd593c5d3] moved NoOpMessageFormatter, DefaultMessageFormatter and LoggingMessageFormatter package from {{kafka.tools}} to {{{}org.apache.kafka.tools.consumer{}}}{{{}{}}} These classes could be used via cmd kafka-console-consumer.sh. We should have a dependency cycle before 3.8.0 comes out. {code:java} bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer{code} The goal in this Jira is to allow user to keep using {{{}kafka.tools.NoOpMessageFormatter{}}}, {{{}kafka.tools.DefaultMessageFormatter{}}}, and {{{}kafka.tools.LoggingMessageFormatter{}}}, but we also display warning messages to say those "strings" will be removed in 4.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16794) Can't open videos in streams documentation
Kuan Po Tseng created KAFKA-16794: - Summary: Can't open videos in streams documentation Key: KAFKA-16794 URL: https://issues.apache.org/jira/browse/KAFKA-16794 Project: Kafka Issue Type: Bug Components: docs Reporter: Kuan Po Tseng Attachments: IMG_4445.png, image.png Can't open videos in page [https://kafka.apache.org/documentation/streams/] Open console in chrome browser and it shows error message: {{Refused to frame 'https://www.youtube.com/' because it violates the following Content Security Policy directive: "frame-src 'self'".}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fix incorrect formatter package in streams quickstart [kafka]
chia7712 commented on code in PR #15991: URL: https://github.com/apache/kafka/pull/15991#discussion_r1605966784 ## docs/streams/quickstart.html: ## @@ -200,7 +200,7 @@ Step 4: St bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ ---formatter kafka.tools.DefaultMessageFormatter \ +--formatter org.apache.kafka.tools.consumer.DefaultMessageFormatter \ Review Comment: @brandboat Could you please file another PR to fix broken compatibility? Let this PR focus on docs updates :smile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847628#comment-17847628 ] Luke Chen commented on KAFKA-16414: --- So, I think we have the consensus that if we can include active segment for retention.bytes as well as making tiered storage integration tests non-flaky, then it is good to make this change. But from [~ckamal] 's opinion, it's not easy to achieve that. Maybe we can give it a quick try and see if the time investment is worth or not. That is, the current behavior has been there for a long time, I think even if we don't change it, users seem to accept it. So if you need much time to make the tiered storage integration test reliable, it might not worth doing it. WDYT [~brandboat] ? Tha > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)