Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
chia7712 commented on PR #15840: URL: https://github.com/apache/kafka/pull/15840#issuecomment-2097507811 @m1a2st please fix the build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16666) Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and `OffsetsMessageFormatter`to tools module
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-1: --- Description: `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data of internal topics. Following the migration plan, we should move them to tools-api module. Also, we need to keep the compatibility of command line. That is to say, `ConsoleConsumer` can accept the previous package name and then use the (java) implementation to parse and make same output. [0] https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269 [1] https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248 [2] https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145 was: `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data of internal topics. Following the migration plan, we should move them to tools module. Also, we need to keep the compatibility of command line. That is to say, `ConsoleConsumer` can accept the previous package name and then use the (java) implementation to parse and make same output. [0] https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269 [1] https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248 [2] https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145 > Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and > `OffsetsMessageFormatter`to tools module > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > > `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and > `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data > of internal topics. Following the migration plan, we should move them to > tools-api module. Also, we need to keep the compatibility of command line. > That is to say, `ConsoleConsumer` can accept the previous package name and > then use the (java) implementation to parse and make same output. > [0] > https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269 > [1] > https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248 > [2] > https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array
Chia-Ping Tsai created KAFKA-16677: -- Summary: Replace ClusterType#ALL and ClusterType#DEFAULT by Array Key: KAFKA-16677 URL: https://issues.apache.org/jira/browse/KAFKA-16677 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of true "type". It seems to me they can be removed by using Array. For example: ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT} ClusterType#DEFAULT -> {} There are two benefits 1. That is more readable for "ALL type". 2. We don't throw the awkward "exception" when seeing "DEFAULT". -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]
showuon commented on code in PR #15837: URL: https://github.com/apache/kafka/pull/15837#discussion_r1591749069 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2822,6 +2822,31 @@ class KafkaApisTest extends Logging { () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) } + @Test + def requiredAclsNotPresentWriteTxnMarkersThrowsAuthorizationException(): Unit = { Review Comment: For this test, it can pass without this change. Maybe we need a test to verify it won't throw exception when alter cluster is allowed, and clusterAction is denied, it won't throw exception. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14885: fix kafka client connect to the broker that offline from… [kafka]
Stephan14 commented on PR #13531: URL: https://github.com/apache/kafka/pull/13531#issuecomment-2097337433 hi @divijvaidya, how can know the failed test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
chia7712 commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1591747966 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -172,72 +138,210 @@ public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { String brokerId = "1"; adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); -alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); +alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "11"), Optional.of(brokerId)); -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "12"), Optional.empty()); +alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "11")); +alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "12")); // Change config -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "13"), Optional.of(brokerId)); -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "14"), Optional.empty()); +alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "13")); +alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "14")); // Delete config -deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); -deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); +deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("message.max.size")); +deleteAndVerifyConfig(zkClient, Optional.empty(), singleton("message.max.size")); // Listener configs: should work only with listener name -alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); +alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, -() -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); +() -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, -() -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); -deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); +() -> alterConfigWithZk(zkClient, Optional.empty(), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"))); +deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("listener.name.external.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, -() -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); +() -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map configs = new HashMap<>(); configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); -Map encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); -alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); +Map encoderConfigs = singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); +alterConfigWithZk(zkClient, Optional.of(brokerId), configs, encoderConfigs); Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId); - assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); +assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded String encodedPassword =
Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
chia7712 commented on code in PR #15873: URL: https://github.com/apache/kafka/pull/15873#discussion_r1591722419 ## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ## @@ -410,6 +448,430 @@ public void testOptionEntityTypeNames() { doTestOptionEntityTypeNames(false); } +@Test +public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfUnrecognisedEntityType() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfMixedEntityTypeFlags() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHostUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfUnresolvableHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "RFC2606.invalid", "--entity-type", "ips",
Re: [PR] MINOR: Various cleanups in clients tests [kafka]
chia7712 commented on code in PR #15877: URL: https://github.com/apache/kafka/pull/15877#discussion_r1591714910 ## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ## @@ -1323,7 +1323,7 @@ public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws Interrupt } else { // Thread to read metadata snapshot, once its updated try { if (!atleastMetadataUpdatedOnceLatch.await(5, TimeUnit.MINUTES)) { Review Comment: How about using `assertDoesNotThrow`? For example: ```java assertTrue(assertDoesNotThrow(() -> atleastMetadataUpdatedOnceLatch.await(5, TimeUnit.MINUTES)), "Test had to wait more than 5 minutes, something went wrong."); ``` ## clients/src/test/java/org/apache/kafka/clients/MetadataTest.java: ## @@ -1335,7 +1335,7 @@ public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws Interrupt }); } if (!allThreadsDoneLatch.await(5, TimeUnit.MINUTES)) { Review Comment: How about using `assertTrue`? ```java assertTrue(allThreadsDoneLatch.await(5, TimeUnit.MINUTES), "Test had to wait more than 5 minutes, something went wrong."); ``` ## clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java: ## @@ -220,9 +220,7 @@ public void shouldThrowNpeWhenAddingCollectionWithNullHeader() { private int getCount(Headers headers) { int count = 0; -Iterator headerIterator = headers.iterator(); -while (headerIterator.hasNext()) { -headerIterator.next(); +for (Header ignore : headers) { Review Comment: How about using `toArray`? for example: `headers.toArray().length` ## clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java: ## @@ -78,34 +78,30 @@ public static void main(String[] args) throws Exception { final Time time = Time.SYSTEM; final AtomicBoolean done = new AtomicBoolean(false); final Object lock = new Object(); Review Comment: Maybe we should just delete this old class ... ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -106,10 +103,9 @@ public void setup() { commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); offsetsRequestManager = testBuilder.offsetsRequestManager; coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -memberhipsManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); +HeartbeatRequestManager heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); Review Comment: The variables `heartbeatRequestManager` and `membershipManager` are unused. Are they used to test the existence of `heartbeatRequestManager` and `membershipManager`? If so, could we rewrite them by `assertTrue`? For example: `assertTrue(testBuilder.heartbeatRequestManager.isPresent());` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]
mjsax commented on PR #15874: URL: https://github.com/apache/kafka/pull/15874#issuecomment-2097231440 Thanks for the PR. Makes sense. What I don't fully understand is, what you mean by `This simplifies testing downstream, since the test parameters do not change with every version.` > In particular, some tests downstream are blacklisted because they do not work with ARM. These lists need to be updated every time DEV_VERSION is bumped. Seems this does not change after this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]
chia7712 commented on code in PR #15715: URL: https://github.com/apache/kafka/pull/15715#discussion_r1591692467 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -158,12 +162,24 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu .setName(annot.name().trim().isEmpty() ? null : annot.name()) .setListenerName(annot.listener().trim().isEmpty() ? null : annot.listener()) .setServerProperties(serverProperties) +.setPerServerProperties(perServerProperties) .setSecurityProtocol(annot.securityProtocol()) .setMetadataVersion(annot.metadataVersion()) .build(); type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations); } +private void processClusterConfigProperty(ClusterConfigProperty property, Review Comment: Please remove this unused function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2097161445 @frankvicky Could you please rebase code to trigger QA again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16470) kafka-dump-log --offsets-decoder should support new records
[ https://issues.apache.org/jira/browse/KAFKA-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16470. Fix Version/s: 3.8.0 Resolution: Fixed > kafka-dump-log --offsets-decoder should support new records > --- > > Key: KAFKA-16470 > URL: https://issues.apache.org/jira/browse/KAFKA-16470 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16470; kafka-dump-log --offsets-decoder should support new records [kafka]
chia7712 merged PR #15652: URL: https://github.com/apache/kafka/pull/15652 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16666) Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and `OffsetsMessageFormatter`to tools module
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-1: --- Labels: need-kip (was: ) > Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and > `OffsetsMessageFormatter`to tools module > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > > `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and > `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data > of internal topics. Following the migration plan, we should move them to > tools module. Also, we need to keep the compatibility of command line. That > is to say, `ConsoleConsumer` can accept the previous package name and then > use the (java) implementation to parse and make same output. > [0] > https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269 > [1] > https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248 > [2] > https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2097148532 @gaurav-narula Could you please rebase code to trigger QA again? It seems we have thread leaks in some tests :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16539) Can't update specific broker configs in pre-migration mode
[ https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844071#comment-17844071 ] Chia-Ping Tsai commented on KAFKA-16539: [~davidarthur] I have pushed it to trunk. Cherry-pick causes some conflicts on branch 3.7, so maybe we need to file a PR for backporting. Also, not sure whether we need to backport to branch 3.6, because it seems we don't release x.x.3 normally. > Can't update specific broker configs in pre-migration mode > -- > > Key: KAFKA-16539 > URL: https://issues.apache.org/jira/browse/KAFKA-16539 > Project: Kafka > Issue Type: Bug > Components: config, kraft >Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > In migration mode, ZK brokers will have a forwarding manager configured. This > is used to forward requests to the KRaft controller once we get to that part > of the migration. However, prior to KRaft taking over as the controller > (known as pre-migration mode), the ZK brokers are still attempting to forward > IncrementalAlterConfigs to the controller. > This works fine for cluster level configs (e.g., "-entity-type broker > --entity-default"), but this fails for specific broker configs (e.g., > "-entity-type broker --entity-id 1"). > This affects BROKER and BROKER_LOGGER config types. > To workaround this bug, you can either disable migrations on the brokers > (assuming no migration has taken place), or proceed with the migration and > get to the point where KRaft is the controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
chia7712 merged PR #15744: URL: https://github.com/apache/kafka/pull/15744 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
m1a2st commented on PR #15840: URL: https://github.com/apache/kafka/pull/15840#issuecomment-2097144066 Hello @chia7712 please review, Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
m1a2st commented on PR #15840: URL: https://github.com/apache/kafka/pull/15840#issuecomment-2097142502 Because Old Test Only have tests with Zookeeper, thus I change old test for Zookeeper and Kraft, and add new tests for KRaft, I mainly test the the config update mode for configs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16608) AsyncKafkaConsumer doesn't honor interrupted thread status on KafkaConsumer.poll(Duration)
[ https://issues.apache.org/jira/browse/KAFKA-16608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16608. Fix Version/s: 3.8.0 Resolution: Fixed > AsyncKafkaConsumer doesn't honor interrupted thread status on > KafkaConsumer.poll(Duration) > -- > > Key: KAFKA-16608 > URL: https://issues.apache.org/jira/browse/KAFKA-16608 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Minor > Fix For: 3.8.0 > > > The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in > interrupted state is to throw InterruptException. The AsyncKafkaConsumer > doesn't do this. It only throws that exception if the interruption occurs > while it is waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
chia7712 merged PR #15803: URL: https://github.com/apache/kafka/pull/15803 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
gharris1727 commented on code in PR #15469: URL: https://github.com/apache/kafka/pull/15469#discussion_r1591671051 ## connect/api/src/main/java/org/apache/kafka/connect/data/Values.java: ## @@ -766,135 +852,23 @@ protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embed protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException { Review Comment: My core difficulty is that the parsing logic and the conversion logic mutually depend on one another: 1. The convertTo methods check if the input is a String, and then run it through the Parser. 2. After parsing a map or array, the Parser calls convertTo on the elements to "cast" them to a common schema I'm pretty sure convertTo -> parser -> convertTo is a reasonable cycle, and should happen all the time via convertToList, convertToMap. I don't think that parser -> convertTo -> parser is a useful cycle for multiple reasons, but proving that is a little bit slippery. With some time I think I can break this part of the cycle so that this doesn't end up as one big ball of code again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde
[ https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-16356. - Fix Version/s: 3.8.0 Resolution: Fixed > Remove class-name dispatch in RemoteLogMetadataSerde > > > Key: KAFKA-16356 > URL: https://issues.apache.org/jira/browse/KAFKA-16356 > Project: Kafka > Issue Type: Task > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Greg Harris >Assignee: Linu Shibu >Priority: Trivial > Labels: newbie > Fix For: 3.8.0 > > > The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and > has to dispatch to one of four serializers depending on it's type. This is > done by taking the class name of the RemoteLogMetadata and looking it up in > maps to find the corresponding serializer for that class. > This later requires an unchecked cast, because the RemoteLogMetadataTransform > is generic. This is all type-unsafe, and can be replaced with type-safe > if-elseif-else statements that may also be faster than the double-indirect > map lookups. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2097110125 Thanks @linu-shibu for your patience, I was waiting for something else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 merged PR #15620: URL: https://github.com/apache/kafka/pull/15620 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844063#comment-17844063 ] sanghyeok An commented on KAFKA-16670: -- Hi, [~lianetm]! thanks for your comments. your description is clear, and i've understand it! So based on those expectations and back to your example, we don't need to wait before calling subscribe (that's handled internally by the HeartbeatRequestManager as described above). I wonder if it's the fact that in the failed case you're polling 10 times only (vs. 100 times in the successful case)?? In order to receive records, we do need to make sure that we are calling poll after the assignment has been received (so the consumer issues a fetch request for the partitions assigned). Note that even when you poll for 1s in your test, a poll that happens before the assignment has been received, will block for 1s but it's doomed to return empty, because it is not waiting for records from the topics you're interested in (no partitions assigned yet). Could you make sure that the test is calling poll after the assignment has been received? (I would suggest just polling while true for a certain amount of time, no sleeping after the subscribe needed). Sorry, i make you confused. I intended to try it 100 times for both failure and success cases, but the code was set to attempt only 10 times in failure case. Anyway, as you suggested, I proceeded by logging the {{poll()}} attempts. !image-2024-05-07-08-34-06-855.png|width=721,height=269! # The consumer calls {{poll()}} up to 1000 times. # consumer will leave log ("i : " + i) each by each try. # if consumer success to poll not empty record, consumer do countdown of countDownLatch. and then, we can check whether countDownLatch is 0. !image-2024-05-07-08-36-40-656.png|width=848,height=315! I waited until it was called 430 times. it means consumer wait for assignment during about 430 sec. However, consumer could not get their assignment yet. !image-2024-05-07-08-38-27-753.png|width=1654,height=289! However, after receiving the initial FindCoordinator Request, the broker does not perform any action. Please see the log above. Broker don't have any log after 2024-05-06 23:29:27, but by the time of the 430th attempt, it was already 2024-05-06 23:39:00. Anyway, it seems that at least one of the consumer or the broker has a potential issue. What do you think? > KIP-848 : Consumer will not receive assignment forever because of concurrent > issue. > --- > > Key: KAFKA-16670 > URL: https://issues.apache.org/jira/browse/KAFKA-16670 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Major > Attachments: image-2024-05-07-08-34-06-855.png, > image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png, > image-2024-05-07-08-38-27-753.png > > > *Related Code* > * Consumer get assignment Successfully : > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] > * Consumer get be stuck Forever because of concurrent issue: > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] > > *Unexpected behaviour* > * > Broker is sufficiently slow. > * When a KafkaConsumer is created and immediately subscribes to a topic > If both conditions are met, {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and become stuck indefinitely. > In case of new broker and new consumer, when consumer are created, consumer > background thread send a request to broker. (I guess groupCoordinator > Heartbeat request). In that time, if broker does not load metadata from > {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After > broker load metadata completely, consumer background thread think 'this > broker is valid group coordinator'. > However, consumer can send {{subscribe}} request to broker before {{broker}} > reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, > consumer seems to be stuck. > If both conditions are met, the {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and may become indefinitely stuck. In the case > of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, > {{consumer background thread}} start to send a request to the broker. (I > believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this > time, if the {{broker}} has not yet loaded metadata from > {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once > the broker has completely loaded the metadata,
[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16670: - Attachment: image-2024-05-07-08-38-27-753.png > KIP-848 : Consumer will not receive assignment forever because of concurrent > issue. > --- > > Key: KAFKA-16670 > URL: https://issues.apache.org/jira/browse/KAFKA-16670 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Major > Attachments: image-2024-05-07-08-34-06-855.png, > image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png, > image-2024-05-07-08-38-27-753.png > > > *Related Code* > * Consumer get assignment Successfully : > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] > * Consumer get be stuck Forever because of concurrent issue: > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] > > *Unexpected behaviour* > * > Broker is sufficiently slow. > * When a KafkaConsumer is created and immediately subscribes to a topic > If both conditions are met, {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and become stuck indefinitely. > In case of new broker and new consumer, when consumer are created, consumer > background thread send a request to broker. (I guess groupCoordinator > Heartbeat request). In that time, if broker does not load metadata from > {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After > broker load metadata completely, consumer background thread think 'this > broker is valid group coordinator'. > However, consumer can send {{subscribe}} request to broker before {{broker}} > reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, > consumer seems to be stuck. > If both conditions are met, the {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and may become indefinitely stuck. In the case > of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, > {{consumer background thread}} start to send a request to the broker. (I > believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this > time, if the {{broker}} has not yet loaded metadata from > {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once > the broker has completely loaded the metadata, the {{consumer background > thread}} recognizes this broker as a valid group coordinator. However, there > is a possibility that the {{consumer}} can send a {{subscribe request}} to > the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator > Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be > stuck. > > You can check this scenario, in the > {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}. > If there is no sleep time to wait {{{}GroupCoordinator Heartbeat > Request{}}}, {{consumer}} will be always stuck. If there is a little sleep > time, {{consumer}} will always receive assignment. > > README : > [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md] > > In my case, consumer get assignment in `docker-compose` : it means not enough > slow. > However, consumer cannot get assignmet in `testcontainers` without little > waiting time. : it means enough slow to cause concurrent issue. > `testconatiners` is docker in docker, thus `testcontainers` will be slower > than `docker-compose`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16670: - Attachment: image-2024-05-07-08-36-22-983.png > KIP-848 : Consumer will not receive assignment forever because of concurrent > issue. > --- > > Key: KAFKA-16670 > URL: https://issues.apache.org/jira/browse/KAFKA-16670 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Major > Attachments: image-2024-05-07-08-34-06-855.png, > image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png > > > *Related Code* > * Consumer get assignment Successfully : > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] > * Consumer get be stuck Forever because of concurrent issue: > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] > > *Unexpected behaviour* > * > Broker is sufficiently slow. > * When a KafkaConsumer is created and immediately subscribes to a topic > If both conditions are met, {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and become stuck indefinitely. > In case of new broker and new consumer, when consumer are created, consumer > background thread send a request to broker. (I guess groupCoordinator > Heartbeat request). In that time, if broker does not load metadata from > {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After > broker load metadata completely, consumer background thread think 'this > broker is valid group coordinator'. > However, consumer can send {{subscribe}} request to broker before {{broker}} > reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, > consumer seems to be stuck. > If both conditions are met, the {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and may become indefinitely stuck. In the case > of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, > {{consumer background thread}} start to send a request to the broker. (I > believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this > time, if the {{broker}} has not yet loaded metadata from > {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once > the broker has completely loaded the metadata, the {{consumer background > thread}} recognizes this broker as a valid group coordinator. However, there > is a possibility that the {{consumer}} can send a {{subscribe request}} to > the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator > Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be > stuck. > > You can check this scenario, in the > {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}. > If there is no sleep time to wait {{{}GroupCoordinator Heartbeat > Request{}}}, {{consumer}} will be always stuck. If there is a little sleep > time, {{consumer}} will always receive assignment. > > README : > [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md] > > In my case, consumer get assignment in `docker-compose` : it means not enough > slow. > However, consumer cannot get assignmet in `testcontainers` without little > waiting time. : it means enough slow to cause concurrent issue. > `testconatiners` is docker in docker, thus `testcontainers` will be slower > than `docker-compose`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16670: - Attachment: image-2024-05-07-08-36-40-656.png > KIP-848 : Consumer will not receive assignment forever because of concurrent > issue. > --- > > Key: KAFKA-16670 > URL: https://issues.apache.org/jira/browse/KAFKA-16670 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Major > Attachments: image-2024-05-07-08-34-06-855.png, > image-2024-05-07-08-36-22-983.png, image-2024-05-07-08-36-40-656.png > > > *Related Code* > * Consumer get assignment Successfully : > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] > * Consumer get be stuck Forever because of concurrent issue: > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] > > *Unexpected behaviour* > * > Broker is sufficiently slow. > * When a KafkaConsumer is created and immediately subscribes to a topic > If both conditions are met, {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and become stuck indefinitely. > In case of new broker and new consumer, when consumer are created, consumer > background thread send a request to broker. (I guess groupCoordinator > Heartbeat request). In that time, if broker does not load metadata from > {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After > broker load metadata completely, consumer background thread think 'this > broker is valid group coordinator'. > However, consumer can send {{subscribe}} request to broker before {{broker}} > reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, > consumer seems to be stuck. > If both conditions are met, the {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and may become indefinitely stuck. In the case > of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, > {{consumer background thread}} start to send a request to the broker. (I > believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this > time, if the {{broker}} has not yet loaded metadata from > {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once > the broker has completely loaded the metadata, the {{consumer background > thread}} recognizes this broker as a valid group coordinator. However, there > is a possibility that the {{consumer}} can send a {{subscribe request}} to > the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator > Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be > stuck. > > You can check this scenario, in the > {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}. > If there is no sleep time to wait {{{}GroupCoordinator Heartbeat > Request{}}}, {{consumer}} will be always stuck. If there is a little sleep > time, {{consumer}} will always receive assignment. > > README : > [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md] > > In my case, consumer get assignment in `docker-compose` : it means not enough > slow. > However, consumer cannot get assignmet in `testcontainers` without little > waiting time. : it means enough slow to cause concurrent issue. > `testconatiners` is docker in docker, thus `testcontainers` will be slower > than `docker-compose`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16670: - Attachment: image-2024-05-07-08-34-06-855.png > KIP-848 : Consumer will not receive assignment forever because of concurrent > issue. > --- > > Key: KAFKA-16670 > URL: https://issues.apache.org/jira/browse/KAFKA-16670 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Major > Attachments: image-2024-05-07-08-34-06-855.png > > > *Related Code* > * Consumer get assignment Successfully : > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] > * Consumer get be stuck Forever because of concurrent issue: > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] > > *Unexpected behaviour* > * > Broker is sufficiently slow. > * When a KafkaConsumer is created and immediately subscribes to a topic > If both conditions are met, {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and become stuck indefinitely. > In case of new broker and new consumer, when consumer are created, consumer > background thread send a request to broker. (I guess groupCoordinator > Heartbeat request). In that time, if broker does not load metadata from > {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After > broker load metadata completely, consumer background thread think 'this > broker is valid group coordinator'. > However, consumer can send {{subscribe}} request to broker before {{broker}} > reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, > consumer seems to be stuck. > If both conditions are met, the {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and may become indefinitely stuck. In the case > of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, > {{consumer background thread}} start to send a request to the broker. (I > believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this > time, if the {{broker}} has not yet loaded metadata from > {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once > the broker has completely loaded the metadata, the {{consumer background > thread}} recognizes this broker as a valid group coordinator. However, there > is a possibility that the {{consumer}} can send a {{subscribe request}} to > the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator > Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be > stuck. > > You can check this scenario, in the > {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}. > If there is no sleep time to wait {{{}GroupCoordinator Heartbeat > Request{}}}, {{consumer}} will be always stuck. If there is a little sleep > time, {{consumer}} will always receive assignment. > > README : > [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md] > > In my case, consumer get assignment in `docker-compose` : it means not enough > slow. > However, consumer cannot get assignmet in `testcontainers` without little > waiting time. : it means enough slow to cause concurrent issue. > `testconatiners` is docker in docker, thus `testcontainers` will be slower > than `docker-compose`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
gharris1727 commented on code in PR #15469: URL: https://github.com/apache/kafka/pull/15469#discussion_r1591636980 ## connect/api/src/main/java/org/apache/kafka/connect/data/Values.java: ## @@ -766,135 +852,23 @@ protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embed protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException { Review Comment: Parser was protected, so I think it's still safe to refactor. The class doesn't show up here: https://javadoc.io/doc/org.apache.kafka/connect-api/latest/index.html I moved the existing Parser to Tokenizer, as it had a good interface already, and adding methods would just be clutter. The methods which took a Parser argument have now been moved to a single toplevel class named Parser. Both of these classes are package-local, so shouldn't appear in the API docs. I left almost all of the private/protected static methods in Values, just bringing a few over that were only ever called by the Parser. I tried moving things from Values to Parser to break the circular dependency, but this required moving nearly everything to Parser. The two classes are really intertwined, and i'm not really satisfied with this refactor now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]
TaiJuWu commented on code in PR #15862: URL: https://github.com/apache/kafka/pull/15862#discussion_r1591630803 ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -33,16 +31,22 @@ public class ClusterTestExtensionsUnitTest { void testProcessClusterTemplate() { ClusterTestExtensions ext = new ClusterTestExtensions(); ExtensionContext context = mock(ExtensionContext.class); -Consumer testInvocations = mock(Consumer.class); ClusterTemplate annot = mock(ClusterTemplate.class); -when(annot.value()).thenReturn("").thenReturn(" "); +when(annot.value()).thenReturn("").thenReturn(" ").thenReturn("test_empty_config"); Review Comment: @soarez Thanks for your comments. All is vert excellent!!! Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
gharris1727 commented on code in PR #15469: URL: https://github.com/apache/kafka/pull/15469#discussion_r1591616004 ## connect/api/src/main/java/org/apache/kafka/connect/data/Values.java: ## @@ -177,7 +213,12 @@ public static Long convertToLong(Schema schema, Object value) throws DataExcepti * @throws DataException if the value could not be converted to a float */ public static Float convertToFloat(Schema schema, Object value) throws DataException { Review Comment: I added tests to get the methods themselves up to 100% coverage, but the overall class still is missing some coverage. Thanks for pointing this out, as there were certainly some pretty obvious cases that weren't tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
junrao commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1589798416 ## raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.kafka.common.message.KRaftVersionRecord; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.ControlRecord; +import org.apache.kafka.raft.Isolation; +import org.apache.kafka.raft.LogFetchInfo; +import org.apache.kafka.raft.ReplicatedLog; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; +import org.apache.kafka.snapshot.SnapshotReader; +import org.slf4j.Logger; + +/** + * The KRaft state machine for tracking control records in the topic partition. + * + * This type keeps track of changes to the finalized kraft.version and the sets of voters between + * the latest snasphot and the log end offset. + * + * The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of Review Comment: The are => There are ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java: ## @@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage( ByteBuffer buffer, LeaderChangeMessage leaderChangeMessage ) { -writeLeaderChangeMessage(buffer, initialOffset, timestamp, leaderEpoch, leaderChangeMessage); -buffer.flip(); -return MemoryRecords.readableRecords(buffer); +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) +) { +builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage); +return builder.build(); +} } -private static void writeLeaderChangeMessage( -ByteBuffer buffer, +public static MemoryRecords withSnapshotHeaderRecord( long initialOffset, long timestamp, int leaderEpoch, -LeaderChangeMessage leaderChangeMessage +ByteBuffer buffer, +SnapshotHeaderRecord snapshotHeaderRecord ) { -try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( -buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, -TimestampType.CREATE_TIME, initialOffset, timestamp, -RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, -false, true, leaderEpoch, buffer.capacity()) +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) ) { -builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage); +builder.appendSnapshotHeaderMessage(timestamp, snapshotHeaderRecord); +return builder.build(); } } -public static MemoryRecords withSnapshotHeaderRecord( +public static MemoryRecords withSnapshotFooterRecord( long initialOffset, long timestamp, int leaderEpoch, ByteBuffer buffer, -SnapshotHeaderRecord snapshotHeaderRecord +SnapshotFooterRecord snapshotFooterRecord ) { -writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotHeaderRecord); -buffer.flip(); -return MemoryRecords.readableRecords(buffer); +try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( +initialOffset, +timestamp, +leaderEpoch, +buffer +) +) { +
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2096976398 @gharris1727 any update on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887 ] Lianet Magrans edited comment on KAFKA-16670 at 5/6/24 9:01 PM: Hey [~chickenchickenlove], thanks for trying out this! Some clarification in case it helps. In the flow you described, the new consumer will send a request to find the group coordinator (FindCoordinator) when it gets created, but even if there's a call to consumer.subscribe right after, it won't send a request to subscribe (HeartbeatRequest) until it gets a response to the initial FindCoordinator request (HeartbeatManager skips sending requests if it does not know the coordinator [here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]). Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest request will be sent containing the new subscription. The consumer will then eventually receive the assignment, but we don't know exactly when from the consumer point of view. The rebalance callbacks are what signal to the consumer that the call to subscribe completed with an assignment received. So it's only after the consumer gets the assignment that a call to poll can return the records that are available. So based on those expectations and back to your example, we don't need to wait before calling subscribe (that's handled internally by the HeartbeatRequestManager as described above). I wonder if it's the fact that in the failed case you're polling 10 times only (vs. 100 times in the successful case)?? In order to receive records, we do need to make sure that we are calling poll after the assignment has been received (so the consumer issues a fetch request for the partitions assigned). Note that even when you poll for 1s in your test, a poll that happens before the assignment has been received, will block for 1s but it's doomed to return empty, because it is not waiting for records from the topics you're interested in (no partitions assigned yet). Could you make sure that the test is calling poll after the assignment has been received? (I would suggest just polling while true for a certain amount of time, no sleeping after the subscribe needed). This integration test for the consumer [testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153] has a very similar logic to the one you're trying to achieve (create consumer, subscribe right away and consume), and since a new broker and consumer are setup for each test, the test will go down the same path of having to find a coordinator before sending the HeartbeatRequest with a subscription. The main difference from looking at both seems to be the limited number of polls in your failed test scenario, so let's try to rule that out to better isolate the situation. Hope it helps! Let me know was (Author: JIRAUSER300183): Hey [~chickenchickenlove], thanks for trying out this! Some clarification in case it helps. In the flow you described, the new consumer will send a request to find the group coordinator (FindCoordinator) when it gets created, but even if there's a call to consumer.subscribe right after, it won't send a request to subscribe until it gets a response to the initial FindCoordinator request (HeartbeatManager skips sending requests if it does not know the coordinator [here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]). Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest request will be sent containing the new subscription. The consumer will then eventually receive the assignment, but we don't know exactly when from the consumer point of view. The rebalance callbacks are what signal to the consumer that the call to subscribe completed with an assignment received. So it's only after the consumer gets the assignment that a call to poll can return the records that are available. So based on those expectations and back to your example, we don't need to wait before calling subscribe (that's handled internally by the HeartbeatRequestManager as described above). I wonder if it's the fact that in the failed case you're polling 10 times only (vs. 100 times in the successful case)?? In order to receive records, we do need to make sure that we are calling poll after the assignment has been received (so the consumer issues a fetch request for the partitions assigned). Note that even when you poll for 1s in your test, a poll that happens before the assignment has been received, will block for 1s but it's
[jira] [Commented] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887 ] Lianet Magrans commented on KAFKA-16670: Hey [~chickenchickenlove], thanks for trying out this! Some clarification in case it helps. In the flow you described, the new consumer will send a request to find the group coordinator (FindCoordinator) when it gets created, but even if there's a call to consumer.subscribe right after, it won't send a request to subscribe until it gets a response to the initial FindCoordinator request (HeartbeatManager skips sending requests if it does not know the coordinator [here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]). Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest request will be sent containing the new subscription. The consumer will then eventually receive the assignment, but we don't know exactly when from the consumer point of view. The rebalance callbacks are what signal to the consumer that the call to subscribe completed with an assignment received. So it's only after the consumer gets the assignment that a call to poll can return the records that are available. So based on those expectations and back to your example, we don't need to wait before calling subscribe (that's handled internally by the HeartbeatRequestManager as described above). I wonder if it's the fact that in the failed case you're polling 10 times only (vs. 100 times in the successful case)?? In order to receive records, we do need to make sure that we are calling poll after the assignment has been received (so the consumer issues a fetch request for the partitions assigned). Note that even when you poll for 1s in your test, a poll that happens before the assignment has been received, will block for 1s but it's doomed to return empty, because it is not waiting for records from the topics you're interested in (no partitions assigned yet). Could you make sure that the test is calling poll after the assignment has been received? (I would suggest just polling while true for a certain amount of time, no sleeping after the subscribe needed). This integration test for the consumer [testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153] has a very similar logic to the one you're trying to achieve (create consumer, subscribe right away and consume), and since a new broker and consumer are setup for each test, the test will go down the same path of having to find a coordinator before sending the HeartbeatRequest with a subscription. The main difference from looking at both seems to be the limited number of polls in your failed test scenario, so let's try to rule that out to better isolate the situation. Hope it helps! Let me know > KIP-848 : Consumer will not receive assignment forever because of concurrent > issue. > --- > > Key: KAFKA-16670 > URL: https://issues.apache.org/jira/browse/KAFKA-16670 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Major > > *Related Code* > * Consumer get assignment Successfully : > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] > * Consumer get be stuck Forever because of concurrent issue: > ** > [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] > > *Unexpected behaviour* > * > Broker is sufficiently slow. > * When a KafkaConsumer is created and immediately subscribes to a topic > If both conditions are met, {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and become stuck indefinitely. > In case of new broker and new consumer, when consumer are created, consumer > background thread send a request to broker. (I guess groupCoordinator > Heartbeat request). In that time, if broker does not load metadata from > {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After > broker load metadata completely, consumer background thread think 'this > broker is valid group coordinator'. > However, consumer can send {{subscribe}} request to broker before {{broker}} > reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, > consumer seems to be stuck. > If both conditions are met, the {{Consumer}} can potentially never receive > {{TopicPartition}} assignments and may become indefinitely stuck. In the case >
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on PR #15744: URL: https://github.com/apache/kafka/pull/15744#issuecomment-2096872249 [Latest test run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15744/13/tests) looks pretty reasonable. The failures all look unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14237) Kafka TLS Doesn't Present Intermediary Certificates when using PEM
[ https://issues.apache.org/jira/browse/KAFKA-14237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843869#comment-17843869 ] Gaurav Narula commented on KAFKA-14237: --- Hi [~soxcks] I'm unable to reproduce this issue on 3.7. Here's the self-signed cert chain I used for checking locally: {code:java} -BEGIN CERTIFICATE- MIID2DCCAsCgAwIBAgICEAAwDQYJKoZIhvcNAQELBQAwZTELMAkGA1UEBhMCVVMx DjAMBgNVBAgMBVN0YXRlMREwDwYDVQQHDAhMb2NhdGlvbjEPMA0GA1UECgwGQXBh Y2hlMQ4wDAYDVQQLDAVLYWZrYTESMBAGA1UEAwwJVGVzdCBSb290MB4XDTI0MDUw NjIwMjEyM1oXDTM0MDUwNDIwMjEyM1owVjEWMBQGA1UEAwwNVGVzdCBCcm9rZXIg MTEOMAwGA1UECAwFU3RhdGUxCzAJBgNVBAYTAlVTMQ8wDQYDVQQKDAZBcGFjaGUx DjAMBgNVBAsMBUthZmthMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA 2IfbD26ChT2+/hhPijvk1OIzglDgj+YKZ7Uj01cR0nOg7STIcmIL1D5BuUljtrEq xPqgXDOZIXn1526OfRK+u+1+Adw9mtclM1pnZkgAH2EkgAND/L4NLq07NfC3jOBe vF6UiP1Yg6KoLJAit96y5HOrlXm0hLd6MRDEgHnDtnzDPhMtV03a+JXFAbhfRENq nu/a6hkbodHMh697eSHqifahCpPqq6WraLk43u5P8jzdq7sm8GIjAKaGlkdbCib5 gW6W8ChHQ8fNchKKH1WuAazQeO6X2CGvt0B0JhUX5UsP83Tfqojfgi3MSggOAIDQ Ll7C2eK0XMG1e+qagI65SwIDAQABo4GgMIGdMAkGA1UdEwQCMAAwCwYDVR0PBAQD AgH2MCcGA1UdJQQgMB4GCCsGAQUFBwMBBggrBgEFBQcDAgYIKwYBBQUHAwgwHQYD VR0OBBYEFPAi9B9Dj5WmLBfnu8yEbH7KLW01MBoGA1UdEQQTMBGHBH8AAAGCCWxv Y2FsaG9zdDAfBgNVHSMEGDAWgBTrXaBrjpP8LHcbfhOcrWvo4+otxjANBgkqhkiG 9w0BAQsFAAOCAQEAlPyEgC4egJIajAD2PIeeyXS/eyv43kkNTqsJL9NBepG2njM+ yK1GV1jWwFfoe7IYMtcOme+1tuoNwXUl7gM2l8KRVSue4QLkDo604JHPmvnfqDoh MxOhC2dw96Kh69NgnlR3Ajp/Dg/kPDG2FOL3lowISVhNzTQDr773f7n80CxbgRmq IrlQ/S/j7tF5K1BB8WOinZZUhiUO/TuCmUROK8NKCAIAI5F7c6AbrlbASGFHsRLU Bgrdq9x4X0LVt8GzgwOPk2lpYvC3oggUgig0DqDraJRsFJ0sbxGEZhoftgNzb1Bg jDLHahebah4Cd1csKeI/v8a/r5d4LJ/JrYTv5g== -END CERTIFICATE- -BEGIN CERTIFICATE- MIIDvTCCAqWgAwIBAgIUAzMUkUfTFIln8o4Qi1wHNcixmEcwDQYJKoZIhvcNAQEL BQAwZTELMAkGA1UEBhMCVVMxDjAMBgNVBAgMBVN0YXRlMREwDwYDVQQHDAhMb2Nh dGlvbjEPMA0GA1UECgwGQXBhY2hlMQ4wDAYDVQQLDAVLYWZrYTESMBAGA1UEAwwJ VGVzdCBSb290MB4XDTI0MDUwNjIwMjEyM1oXDTM0MDUwNDIwMjEyM1owZTELMAkG A1UEBhMCVVMxDjAMBgNVBAgMBVN0YXRlMREwDwYDVQQHDAhMb2NhdGlvbjEPMA0G A1UECgwGQXBhY2hlMQ4wDAYDVQQLDAVLYWZrYTESMBAGA1UEAwwJVGVzdCBSb290 MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzuz4dYs7/CqkhHVXO8Zz +1/avBmgXVHgrVMV0njqqtGXH5fHaKQgrmO+dKHjy/brcQbZ+vbctq3F37OdpYwk pIepKYqEdT9yaR8Eb1m3iWnk3cwwz+QvVwYMHMncOfoMooDvb5jwb1bpsMovsdwe NvvY3LtoUF4POIGCH79KmwOSJDDpnixVeZHIYHAyhxAE0LM4xEAinmuvp6t1pgtZ 1urid/uZvk/JbWnp+WB1dr7jVGih6dqbjDBfyoI+3APgxiVMySZYTL3kPbE8aJYD tOXDOO8+0+g+7sGSOPrTF5LsGyE/CDd4lbx4T5mQpavm2iRmuGckXLtBRGJ3xODN fwIDAQABo2UwYzAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIB9jAnBgNVHSUEIDAe BggrBgEFBQcDAQYIKwYBBQUHAwIGCCsGAQUFBwMIMB0GA1UdDgQWBBTrXaBrjpP8 LHcbfhOcrWvo4+otxjANBgkqhkiG9w0BAQsFAAOCAQEATDJ9+6qQat6V3Bbm0kWk L+xy2ETefq9ctT4smXLatkUmtiMs/+ZM762iT3QRGC2kKgK2GITucwiemsUR3NkY V+Y9iqFIkZkdhCfBQB6SAcXhYV5ucBTga0jGE0awEedLEQ6ow/9iUKCfXvH82dWK t38GFHjrqv6gXrGoJKNYUDYVukZnyLWkwd2LD92AXNJPadaWswVJhve/aWkPVSXo f2E/wMG/euP3ulDyzLBE4jrx01rn+nxVVN2zm61mcrTovSu+mft2EB9E9Qs4BVDk vas7tbpsS1mijEoCaArtI8M/IPHRLPE2puM89/fn/jnopUNMZyB6MnaeXsTR/vlm 6w== -END CERTIFICATE- {code} The private key: {code:java} -BEGIN PRIVATE KEY- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDYh9sPboKFPb7+ GE+KO+TU4jOCUOCP5gpntSPTVxHSc6DtJMhyYgvUPkG5SWO2sSrE+qBcM5khefXn bo59Er677X4B3D2a1yUzWmdmSAAfYSSAA0P8vg0urTs18LeM4F68XpSI/ViDoqgs kCK33rLkc6uVebSEt3oxEMSAecO2fMM+Ey1XTdr4lcUBuF9EQ2qe79rqGRuh0cyH r3t5IeqJ9qEKk+qrpatouTje7k/yPN2ruybwYiMApoaWR1sKJvmBbpbwKEdDx81y EoofVa4BrNB47pfYIa+3QHQmFRflSw/zdN+qiN+CLcxKCA4AgNAuXsLZ4rRcwbV7 6pqAjrlLAgMBAAECggEAFRowZDGd8MRSv8q4vb0WkRS2dmXRbNS7gN3rbuZWa08v iM0D5/ncM0QZ/afEWwKrK2VNiY7RxqxvJa3lnxTbl88Ob7n6GwQKsRWHbDVqJaS+ /ObUkmYnPLxPP/OEv+sB8JO7IBqorLOGdklZyNegUZlgSIIC8Mg81VlP/UFgrIEQ VPaVZ/55XCg4jgs6C0uf6NqWrw+BrWziECpV596j+qRHlJErk1DOxuBDSzoZ1Iek P/5ap3ZhMe7cjrz31HrthdtTxA1rcaW9B4w9V6CZuosgDRcc/b4aqtf6E6PmBtAD VFGEF94bxwHNZ83b11/kCqreGfeu5Jj99enZ+Ou5CQKBgQDumpEpUWiKk3joUL2l 76Fg3ZZtFdHBqRZpHpnHybt0Q9bRVN/hT+MrDnOuw5rXR9QNbaFCf9KArz9Q/oB/ GZKlSX7GzeeCgHvVpssnUd0imhMckRuvGUekIHnnHEbusmbxyNmdoomOF92+mqus cpQXavvPGVV8p3WTnJDt0kvcnwKBgQDoUU3lIx6ySd/KH34beBQFfMpG2BQtkGrs 40brRRUaUNsQFjBxjPbiUwPrXV1J/FEiWAy4PLwElcegJTP6YjhNSwWDH0eQA1bs Ub3AMpk8mU6lgfhsDtm1S7ek0mnuQCgM1oWY89QLWSy5pmmAfOac2By9ILcwPeZD WB+FXrY31QKBgQDbjT6lVlNrr+dBXYokdit4hm00Uy9/k6cbcxztyaLDiOjSFdcr 6+aMZ+/qj/KaxW1KLeaE2jlIT/li/cwfJ9jYXphZNn4ghzlrjt7Af4OLo1qSnrNq m0hgrcF993cNjPtM4BPeCQGpziwshwYQ2B2MrtSl7BnNagm2mgqBy1Ai4QKBgElM ze0MRbUvReL6SMnV+0s38oKjzsoJlRMlKs00wNHKzTOoLKTHO2Zxlvz+Ol8Ls3XI nkrLLu+rao8G7f2EXAtXLmgOyH+R7i0mJV6tGFhcbsod1goSLXLcbxccJLw9leVn EkQOOstR2aDB9uvJfOHj9j1eQy5/eVWqSlfEaG35AoGAZtbhiDkVAHk1YBPnvkHd rX+eQw5rNhqFiWrRJu+9RYjHGLieRL4bIZNpw8S+2cYv3MsPmQ7tWozkxJSR1x84 0OIMpGfClo9v4499TwrNtqrhSBXYdY6EsUBrbYeu54USUEQmclfQuuWIRfdbVfyn 6n6txW70n2EQZC3/I0ECWTc= -END PRIVATE KEY- {code} The Root CA: {code:java} -BEGIN CERTIFICATE- MIIDvTCCAqWgAwIBAgIUAzMUkUfTFIln8o4Qi1wHNcixmEcwDQYJKoZIhvcNAQEL
[jira] [Updated] (KAFKA-16531) Fix check-quorum calculation to not assume that the leader is in the voter set
[ https://issues.apache.org/jira/browse/KAFKA-16531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16531: --- Summary: Fix check-quorum calculation to not assume that the leader is in the voter set (was: Fix check-quorum calculation to no assume that the leader is in the voter set) > Fix check-quorum calculation to not assume that the leader is in the voter set > -- > > Key: KAFKA-16531 > URL: https://issues.apache.org/jira/browse/KAFKA-16531 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.8.0 > > > In the check-quorum calculation, the leader should not assume that it is part > of the voter set. This may happen when the leader is removing itself from the > voter set. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16530) Fix high-watermark calculation to not assume the leader is in the voter set
[ https://issues.apache.org/jira/browse/KAFKA-16530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16530: --- Summary: Fix high-watermark calculation to not assume the leader is in the voter set (was: Fix high-watermark calculation to no assume the leader is in the voter set) > Fix high-watermark calculation to not assume the leader is in the voter set > --- > > Key: KAFKA-16530 > URL: https://issues.apache.org/jira/browse/KAFKA-16530 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.8.0 > > > When the leader is being removed from the voter set, the leader may not be in > the voter set. This means that kraft should not assume that the leader is > part of the high-watermark calculation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16676) Security docs missing RPCs from KIP-714 and KIP-1000
Andrew Schofield created KAFKA-16676: Summary: Security docs missing RPCs from KIP-714 and KIP-1000 Key: KAFKA-16676 URL: https://issues.apache.org/jira/browse/KAFKA-16676 Project: Kafka Issue Type: Improvement Components: docs Affects Versions: 3.7.0, 3.8.0 Reporter: Andrew Schofield Assignee: Andrew Schofield Fix For: 3.8.0 KIPs 714 and 1000 introduced 3 new RPCs to do with client metrics. None of them was added to the list of RPCs in the security documentation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]
lianetm commented on code in PR #15408: URL: https://github.com/apache/kafka/pull/15408#discussion_r1591377324 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala: ## @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package integration.kafka.api + +import kafka.api.{AbstractConsumerTest, BaseConsumerTest} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener} +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import java.util +import java.util.Arrays.asList +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import java.util.stream.Stream + +/** + * Integration tests for the consumer that cover interaction with the consumer from within callbacks + * and listeners. + */ +class PlaintextConsumerCallbackTest extends AbstractConsumerTest { + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = { +val tp = new TopicPartition(topic, 0); +triggerOnPartitionsAssigned { (consumer, _) => + val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp))) + assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive") +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerRebalanceListenerAssignmentOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = { +val tp = new TopicPartition(topic, 0); +triggerOnPartitionsAssigned { (consumer, _) => + assertTrue(consumer.assignment().contains(tp)); +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = { +val tp = new TopicPartition(topic, 0); +triggerOnPartitionsAssigned { (consumer, _) => + val map = consumer.beginningOffsets(Collections.singletonList(tp)) + assertTrue(map.containsKey(tp)) + assertEquals(0, map.get(tp)) +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerRebalanceListenerAssignOnPartitionsRevoked(quorum: String, groupProtocol: String): Unit = { +val tp = new TopicPartition(topic, 0); +triggerOnPartitionsRevoked { (consumer, _) => + val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp))) + assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive") Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]
lianetm commented on code in PR #15408: URL: https://github.com/apache/kafka/pull/15408#discussion_r1591377060 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala: ## @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package integration.kafka.api + +import kafka.api.{AbstractConsumerTest, BaseConsumerTest} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener} +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import java.util +import java.util.Arrays.asList +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import java.util.stream.Stream + +/** + * Integration tests for the consumer that cover interaction with the consumer from within callbacks + * and listeners. + */ +class PlaintextConsumerCallbackTest extends AbstractConsumerTest { + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = { +val tp = new TopicPartition(topic, 0); +triggerOnPartitionsAssigned { (consumer, _) => + val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp))) + assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive") Review Comment: nit: the message is totally accurate but this test has nothing to do with pattern subscription, so maybe clearer for whoever gets the failure to be specific about the problem on this test (subscribe & assign mutually exclusive). We'll probably end up adding another test where the failure would be calling subscribe(Pattern) on the callback, and that one would require the message for the 2 subscribe calls being mutually exclusive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1591319255 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1700,16 +1710,16 @@ private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) { } /** - * Validate a request which is only valid between voters. If an error is - * present in the returned value, it should be returned in the response. + * Validate common state for requests to establish leadership. + * + * These include the Vote, BeginQuorumEpoch rnd EndQuorumEpoch RPCs. If an error is present in + * the returned value, it should be returned in the response. */ private Optional validateVoterOnlyRequest(int remoteNodeId, int requestEpoch) { if (requestEpoch < quorum.epoch()) { return Optional.of(Errors.FENCED_LEADER_EPOCH); } else if (remoteNodeId < 0) { return Optional.of(Errors.INVALID_REQUEST); -} else if (quorum.isObserver() || !quorum.isVoter(remoteNodeId)) { -return Optional.of(Errors.INCONSISTENT_VOTER_SET); Review Comment: In KIP-853, `INCONSISTEN_VOTER_SET` is deprecated and replicas will not return this error anymore. In this case replicas that think they are observer need to be allowed to vote if the leader thinks they are voters. This can happen if a voter is added to the set of voters right before an election cycle and the VotersRecord has been replicated to the new voter. ## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ## @@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE // when we send Vote or BeginEpoch requests. ElectionState election; -try { -election = store.readElectionState(); -if (election == null) { -election = ElectionState.withUnknownLeader(0, voters); -} -} catch (final UncheckedIOException e) { -// For exceptions during state file loading (missing or not readable), -// we could assume the file is corrupted already and should be cleaned up. -log.warn("Clearing local quorum state store after error loading state {}", -store, e); -store.clear(); -election = ElectionState.withUnknownLeader(0, voters); -} +election = store +.readElectionState() +.orElseGet(() -> ElectionState.withUnknownLeader(0, latestVoterSet.get().voterIds())); final EpochState initialState; -if (!election.voters().isEmpty() && !voters.equals(election.voters())) { -throw new IllegalStateException("Configured voter set: " + voters -+ " is different from the voter set read from the state file: " + election.voters() -+ ". Check if the quorum configuration is up to date, " -+ "or wipe out the local state file if necessary"); -} else if (election.hasVoted() && !isVoter()) { -String localIdDescription = localId.isPresent() ? -localId.getAsInt() + " is not a voter" : -"is undefined"; -throw new IllegalStateException("Initialized quorum state " + election -+ " with a voted candidate, which indicates this node was previously " -+ " a voter, but the local id " + localIdDescription); Review Comment: In KIP-853, replicas that think they are observer need to be allowed to vote if the leader thinks they are voters. This can happen if a voter is added to the set of voters right before an election cycle and the VotersRecord has been replicated to the new voter. ## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ## @@ -336,40 +346,54 @@ public void transitionToUnattached(int epoch) { */ public void transitionToVoted( int epoch, -int candidateId +ReplicaKey candidateKey ) { -if (localId.isPresent() && candidateId == localId.getAsInt()) { -throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId + -" and epoch=" + epoch + " since it matches the local broker.id"); -} else if (isObserver()) { -throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId + -" and epoch=" + epoch + " since the local broker.id=" + localId + " is not a voter"); -} else if (!isVoter(candidateId)) { -throw new IllegalStateException("Cannot transition to Voted with voterId=" + candidateId + -" and epoch=" + epoch + " since it is not one of the voters " + voters); -} Review Comment: In KIP-853, replicas that think they are observer need to be allowed to vote if the leader thinks they are
Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]
lianetm commented on code in PR #15408: URL: https://github.com/apache/kafka/pull/15408#discussion_r1591300734 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala: ## @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package integration.kafka.api + +import kafka.api.{AbstractConsumerTest, BaseConsumerTest} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener} +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import java.util +import java.util.Arrays.asList +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import java.util.stream.Stream + +/** + * Integration tests for the consumer that cover interaction with the consumer from within callbacks + * and listeners. Review Comment: nit: "callbacks and listeners" in this context refer to the same right? maybe just "rebalance callbacks" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
jeqo commented on PR #15379: URL: https://github.com/apache/kafka/pull/15379#issuecomment-2096481453 Thanks @C0urante! I've applied most suggestions, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
jeqo commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1591285305 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +class SingleFieldPathTest { + +@Test void shouldFindField() { +SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); +Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + +assertEquals(barSchema.field("bar"), pathV2("foo.bar").fieldFrom(schema)); +assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema)); +} + +@Test void shouldReturnNullFieldWhenFieldNotFound() { +SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); +Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + +assertNull(pathV2("un.known").fieldFrom(schema)); +assertNull(pathV2("foo.unknown").fieldFrom(schema)); +assertNull(pathV2("unknown").fieldFrom(schema)); +assertNull(pathV2("test").fieldFrom(null)); +} + +@Test void shouldFindValueInMap() { +Map foo = new HashMap<>(); +foo.put("bar", 42); +foo.put("baz", null); +Map map = new HashMap<>(); +map.put("foo", foo); + +assertEquals(42, pathV2("foo.bar").valueFrom(map)); +assertNull(pathV2("foo.baz").valueFrom(map)); +} + +@Test void shouldReturnNullValueWhenFieldNotFoundInMap() { +Map foo = new HashMap<>(); +foo.put("bar", 42); +foo.put("baz", null); +Map map = new HashMap<>(); +map.put("foo", foo); + +assertNull(new SingleFieldPath("un.known", FieldSyntaxVersion.V2).valueFrom(map)); +assertNull(new SingleFieldPath("foo.unknown", FieldSyntaxVersion.V2).valueFrom(map)); +assertNull(new SingleFieldPath("unknown", FieldSyntaxVersion.V2).valueFrom(map)); +assertNull(new SingleFieldPath("foo.baz", FieldSyntaxVersion.V2).valueFrom(map)); +assertNull(new SingleFieldPath("foo.baz.inner", FieldSyntaxVersion.V2).valueFrom(map)); +} + +@Test void shouldFindValueInStruct() { +SchemaBuilder bazSchema = SchemaBuilder.struct() +.field("inner", Schema.STRING_SCHEMA); +SchemaBuilder barSchema = SchemaBuilder.struct() +.field("bar", Schema.INT32_SCHEMA) +.field("baz", bazSchema.optional()); +Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); +Struct foo = new Struct(barSchema) +.put("bar", 42) +.put("baz", null); +Struct struct = new Struct(schema).put("foo", foo); + +assertEquals(42, pathV2("foo.bar").valueFrom(struct)); +assertNull(pathV2("foo.baz").valueFrom(struct)); +} + +@Test void shouldReturnNullValueWhenFieldNotFoundInStruct() { +SchemaBuilder bazSchema = SchemaBuilder.struct() +.field("inner", Schema.STRING_SCHEMA); +SchemaBuilder barSchema = SchemaBuilder.struct() +.field("bar", Schema.INT32_SCHEMA) +.field("baz", bazSchema.optional()); +Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); +Struct foo = new Struct(barSchema) +.put("bar", 42) +.put("baz", null); +Struct struct = new Struct(schema).put("foo", foo); + +assertNull(new SingleFieldPath("un.known", FieldSyntaxVersion.V2).valueFrom(struct)); +assertNull(new SingleFieldPath("foo.unknown", FieldSyntaxVersion.V2).valueFrom(struct)); +assertNull(new SingleFieldPath("unknown", FieldSyntaxVersion.V2).valueFrom(struct)); +assertNull(new
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
gharris1727 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1591278453 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +267,18 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { +log.warn("worker poll timeout has expired. This means the time between subsequent calls to poll() " + +"in DistributedHerder tick() method was longer than the configured rebalance.timeout.ms. " + +"If you see this happening consistently, then it can be addressed by either adding more workers " + +"to the connect cluster or by increasing the rebalance.timeout.ms configuration value. Please note that " + Review Comment: I think this is decent advice when requests are small and can be distributed around the cluster, but as REST requests are rather infrequent, I think this is the minority of cases. I think most often this timeout is going to be triggered by an excessively slow connector start, stop, or validation. In those cases, adding more workers does nothing but move the error to a different worker. I think we can keep the "adding more workers" comment, if we include another piece of advice for debugging excessively blocking tasks. If we don't have that other piece of advice, then advising users to add workers is misleading. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +267,18 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { Review Comment: Since we (as maintainers) don't have good insight into what commonly causes the herder tick thread to block and the poll timeout to fire, we recently added https://issues.apache.org/jira/browse/KAFKA-15563 to help users debug it themselves. It would be nice to integrate with this system to have the heartbeat thread report what the herder tick thread was blocked on at the time that the poll timeout happened, as this would report stalling that isn't caused by REST requests. The integration is tricky though, because the WorkerCoordinator is (and should be) unaware of the DistributedHerder. And currently I think the WorkerCoordinator hides these internal disconnects and reconnects inside of the poll method. Perhaps we can extend the WorkerRebalanceListener or have a new error listener to allow the herder to be informed about these errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591277510 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); +} else { +tasksToCloseCleanFromStateUpdater.add(task); +} +}); +} + +private void iterateAndActOnFuture(final Map> futures, + final java.util.function.Consumer action) { +for (final Map.Entry> entry : futures.entrySet()) { +final TaskId taskId = entry.getKey(); +final CompletableFuture future = entry.getValue(); +try { +final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future); +action.accept(removedTaskResult); +} catch (final ExecutionException executionException) { +log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ", +taskId, executionException); +} catch (final InterruptedException ignored) { } Review Comment: I am actually in favor of treating them as fatal and throw an `IllegalStateException` to make it more explicit that interruption of a stream thread should not happen. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591262531 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1421,15 +1422,20 @@ public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() { .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2, task3), tasks); +final CompletableFuture future1 = new CompletableFuture<>(); Review Comment: Let me write one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591261680 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); +} else { +tasksToCloseCleanFromStateUpdater.add(task); +} +}); +} + +private void iterateAndActOnFuture(final Map> futures, + final java.util.function.Consumer action) { +for (final Map.Entry> entry : futures.entrySet()) { +final TaskId taskId = entry.getKey(); +final CompletableFuture future = entry.getValue(); +try { +final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future); +action.accept(removedTaskResult); +} catch (final ExecutionException executionException) { +log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ", +taskId, executionException); +} catch (final InterruptedException ignored) { } Review Comment: We are quite inconsistent on how we treat `InterruptedException`. In some places we ignore them because they should not happen and in others we treat them as fatal and throw an `IllegalStateException` because they should not happen [1]. [1] https://github.com/apache/kafka/blob/b36cf4ef977fb14bc57683630a9f3f3680705550/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L597 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16619) Unnecessary controller warning : "Loaded ZK migration state of NONE"
[ https://issues.apache.org/jira/browse/KAFKA-16619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16619: - Labels: good-first-issue (was: ) > Unnecessary controller warning : "Loaded ZK migration state of NONE" > > > Key: KAFKA-16619 > URL: https://issues.apache.org/jira/browse/KAFKA-16619 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 3.6.2 >Reporter: F Méthot >Priority: Trivial > Labels: good-first-issue > > When we launch a fresh cluster of Kafka and Kraft Controller, no zookeeper > involved. > We get this warning in the controller log: > [2024-04-15 03:44:33,881] WARN [QuorumController id=3] Performing controller > activation. Loaded ZK migration state of NONE. > (org.apache.kafka.controller.QuorumController) > > Our project has no business with Zookeeper, seeing this message prompted us > to investigate and spend time looking up this warning to find an explanation. > We have that setting > {_}zookeeper.metadata.migration.enable{_}=false > and we still get that warning. > In future version, to avoid further confusion this message should not be > showed when zookeeper is not involved at all . -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on PR #15744: URL: https://github.com/apache/kafka/pull/15744#issuecomment-2096415182 Thanks for continued reviews @chia7712 , I've addressed your latest feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591223152 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); Review Comment: Ah, OK, yes they are logged in the `DefaultStateUpdater` when they happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2096392283 Resolved conflict with `trunk` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
lucasbru commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591216607 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); Review Comment: I said logging, not throwing. The error should be logged somewhere, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
gharris1727 commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1591196797 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -16,6 +16,15 @@ */ package org.apache.kafka.common.utils; +import java.lang.reflect.Modifier; +import java.nio.BufferUnderflowException; +import java.nio.ByteOrder; +import java.nio.file.StandardOpenOption; +import java.util.AbstractMap; +import java.util.EnumSet; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; Review Comment: I think this should be reverted, we don't touch anything else in this file. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -392,6 +399,146 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +/** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker + * @param pluginInterface the interface for the plugin type + *(e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + *may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if no custom validation was performed (possibly because no custom plugin was defined in the connector + * config) + + * @param the plugin class to perform validation for + */ +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue pluginConfigValue, +Class pluginInterface, +Function configDefAccessor, +String pluginName, +String pluginProperty, +Map defaultProperties +) { +Objects.requireNonNull(connectorConfig); +Objects.requireNonNull(pluginInterface); +Objects.requireNonNull(configDefAccessor); +Objects.requireNonNull(pluginName); +Objects.requireNonNull(pluginProperty); + +String pluginClass = connectorConfig.get(pluginProperty); + +if (pluginClass == null +|| pluginConfigValue == null +|| !pluginConfigValue.errorMessages().isEmpty() +) { +// Either no custom converter was specified, or one was specified but there's a problem with it. +// No need to proceed any further. +return null; +} + +T pluginInstance; +try { +pluginInstance = Utils.newInstance(pluginClass, pluginInterface); +} catch (ClassNotFoundException | RuntimeException e) { +log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e); +pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : "")); +return null; +} + +try { +ConfigDef configDef; +try { +configDef = configDefAccessor.apply(pluginInstance); +} catch (RuntimeException e) { +log.error("Failed to load ConfigDef from {} of type {}", pluginName, pluginClass, e); +pluginConfigValue.addErrorMessage("Failed to load ConfigDef from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : "")); +return null; +} +if (configDef == null) { +log.warn("{}.config() has returned a null ConfigDef; no further preflight config
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591204696 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); +} else { +tasksToCloseCleanFromStateUpdater.add(task); +} +}); +} + +private void iterateAndActOnFuture(final Map> futures, + final java.util.function.Consumer action) { +for (final Map.Entry> entry : futures.entrySet()) { +final TaskId taskId = entry.getKey(); +final CompletableFuture future = entry.getValue(); +try { +final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future); +action.accept(removedTaskResult); +} catch (final ExecutionException executionException) { +log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ", +taskId, executionException); +} catch (final InterruptedException ignored) { } Review Comment: Let me check... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591204373 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); Review Comment: I thought that these tasks are lost anyways, so why should we bother throwing an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591198311 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -71,6 +73,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; public class TaskManager { + +private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + Review Comment: No, we do not have that yet, but I had the same thought. I will look for a good place after these PRs are merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]
gharris1727 commented on PR #14729: URL: https://github.com/apache/kafka/pull/14729#issuecomment-2096338791 Hi @C0urante could you take another pass on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]
jeffkbkim commented on code in PR #15835: URL: https://github.com/apache/kafka/pull/15835#discussion_r1591194245 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java: ## @@ -208,8 +209,8 @@ public void recordEventQueueTime(long durationMs) { } public void recordEventQueueProcessingTime(long durationMs) { } @Override -public void recordThreadIdleRatio(double ratio) { -threadIdleRatioSensor.record(ratio); +public synchronized void recordThreadIdleTime(long idleTimeMs, long currentTimeMs) { +threadIdleTimeRate.record(metrics.config(), idleTimeMs, currentTimeMs); Review Comment: Thanks for the correction. Added the rate to the sensor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs [kafka]
lianetm commented on PR #15856: URL: https://github.com/apache/kafka/pull/15856#issuecomment-2096322959 Thanks @lucasbru, conflicts solved. Good pointer about the new file for callback tests, I created https://issues.apache.org/jira/browse/KAFKA-16675 assigned to me to make sure I move the test as soon as that one gets merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16675) Move rebalance callback test for positions to callbacks test file
Lianet Magrans created KAFKA-16675: -- Summary: Move rebalance callback test for positions to callbacks test file Key: KAFKA-16675 URL: https://issues.apache.org/jira/browse/KAFKA-16675 Project: Kafka Issue Type: Task Components: consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Integration test testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback was added to the PlaintextConsumerTest.scala in this PR https://github.com/apache/kafka/pull/15856, as there was no specific file for testing callbacks at the moment. Another PR is in-flight, adding the file for callback-related tests, https://github.com/apache/kafka/pull/15408. Once 15408 gets merged, we should move testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback to it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs [kafka]
lucasbru commented on PR #15856: URL: https://github.com/apache/kafka/pull/15856#issuecomment-2096266336 @lianetm some conflicts need to be resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
lianetm commented on PR #15844: URL: https://github.com/apache/kafka/pull/15844#issuecomment-2096232639 Did we consider the approach of simply decoupling the request timeout from the application event timeout? We could issue the fetch request without a time boundary (max value probably), and get the application event result with the time boundary ([here](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1672-L1676)). Expressing the intention when creating the request and event seems clearer and brings what we want: fetch requests would remain in the background thread until they get a response or timeout, so they could be reused by a following fetch application event (for the same partitions). Then we could keep the manager logic simple and consistent around how inflights are maintained (removed when they get a response or expire, as it is now). I may be missing something, thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1591129328 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerZkVersionOpt: Option[Int] = if (!enableEntityConfigNoController) { Review Comment: The name "enableEntityConfigNoController" is meant to convey "Enable setting entity configs even when there is no controller". But even as I've been coding this i've mixed up the meaning more than once I'll go with `enableEntityConfigControllerCheck` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1591129328 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerZkVersionOpt: Option[Int] = if (!enableEntityConfigNoController) { Review Comment: The name "enableEntityConfigNoController" is meant to convey "Enable setting entity configs even when there is no controller". But even as I've been coding this i've mixed up the meaning more than once How about: `requireEntityConfigControllerCheck`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in clients tests [kafka]
mimaison opened a new pull request, #15877: URL: https://github.com/apache/kafka/pull/15877 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
lucasbru commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591121181 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); +} else { +tasksToCloseCleanFromStateUpdater.add(task); +} +}); +} + +private void iterateAndActOnFuture(final Map> futures, + final java.util.function.Consumer action) { +for (final Map.Entry> entry : futures.entrySet()) { +final TaskId taskId = entry.getKey(); +final CompletableFuture future = entry.getValue(); +try { +final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future); +action.accept(removedTaskResult); +} catch (final ExecutionException executionException) { +log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ", +taskId, executionException); +} catch (final InterruptedException ignored) { } Review Comment: Can we just ignore this? I see other classes in the package rethrowing it ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -71,6 +73,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; public class TaskManager { + +private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + Review Comment: I see that's already defined in a couple of places. Did you check if there is a good utility class where this could be defined? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); Review Comment: Is the exception already logged somehwere else? We are just dropping it here. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1421,15 +1422,20 @@ public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() { .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2, task3), tasks); +final CompletableFuture future1 = new CompletableFuture<>(); Review Comment: Do we need a test that covers the part where we get an exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [KAFKA-16646] Don't run cve scan job on forks [kafka]
omkreddy merged PR #15831: URL: https://github.com/apache/kafka/pull/15831 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2096203457 Fixing the failing build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1591110984 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest { admin.alterUserScramCredentials(alterations) } - def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = { -TestUtils.retry(1) { + def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit = { +maybeRetry(shouldRetry, 1) { val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test") assertEquals("204800", propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG)) assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG)) } } - def verifyClientQuotas(zkClient: KafkaZkClient): Unit = { -TestUtils.retry(1) { - assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate")) - assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, "").getProperty("consumer_byte_rate")) - assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate")) - assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate")) - assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate")) + def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit = { +maybeRetry(shouldRetry, 1) { + val defaultBrokerProps = zkClient.getEntityConfigs(ConfigType.BROKER, "") + assertEquals("8640", defaultBrokerProps.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)) + + val broker0Props = zkClient.getEntityConfigs(ConfigType.BROKER, "0") + assertEquals("4320", broker0Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)) + + val broker1Props = zkClient.getEntityConfigs(ConfigType.BROKER, "1") + assertEquals("4320", broker1Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)) } } + def verifyClientQuotas(zkClient: KafkaZkClient): Unit = { Review Comment: Ah good catch! I did not mean to change this logic. Will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
nizhikov commented on PR #15873: URL: https://github.com/apache/kafka/pull/15873#issuecomment-2096190310 Hello @chia7712 Second part of `ConfigCommandTest` refactoring ready for review. Please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] John confluent unclean recovery elect leader request [kafka]
mannoopj commented on PR #15876: URL: https://github.com/apache/kafka/pull/15876#issuecomment-2096185204 WIP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16673) Optimize toTopicPartitions with ConsumerProtocolSubscription
[ https://issues.apache.org/jira/browse/KAFKA-16673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu reassigned KAFKA-16673: --- Assignee: Dongnuo Lyu > Optimize toTopicPartitions with ConsumerProtocolSubscription > > > Key: KAFKA-16673 > URL: https://issues.apache.org/jira/browse/KAFKA-16673 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > > https://github.com/apache/kafka/pull/15798#discussion_r1582981154 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16674) Adjust classicGroupJoinToConsumerGroup to add subscription model
[ https://issues.apache.org/jira/browse/KAFKA-16674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu reassigned KAFKA-16674: --- Assignee: Dongnuo Lyu > Adjust classicGroupJoinToConsumerGroup to add subscription model > > > Key: KAFKA-16674 > URL: https://issues.apache.org/jira/browse/KAFKA-16674 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > > [https://github.com/apache/kafka/pull/15785] adds subscription model to the > group state that affects `classicGroupJoinToConsumerGroup`. We'll need to > make adjustment to comply with the change once #15785 is merged. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16674) Adjust classicGroupJoinToConsumerGroup to add subscription model
Dongnuo Lyu created KAFKA-16674: --- Summary: Adjust classicGroupJoinToConsumerGroup to add subscription model Key: KAFKA-16674 URL: https://issues.apache.org/jira/browse/KAFKA-16674 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu [https://github.com/apache/kafka/pull/15785] adds subscription model to the group state that affects `classicGroupJoinToConsumerGroup`. We'll need to make adjustment to comply with the change once #15785 is merged. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
lianetm commented on PR #15844: URL: https://github.com/apache/kafka/pull/15844#issuecomment-2096155474 One concern on [comment](https://github.com/apache/kafka/pull/15844#discussion_r1591065112) above about how we identify this situation (inflight fetch requests that we shouldn't delete too soon). Another one about where to consider the situation. Inflight requests are removed in 2 places: direct call to fetch (handled in this PR), but also from the commit manager poll. The commit manager (as other managers) has logic for removing all expired requests in its poll loop, when calling [failAndRemoveExpiredFetchRequests](https://github.com/apache/kafka/blob/42754336e1ff35cb45661f1a906fc24b761b27cf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1182). Shouldn't we consider that too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16673) Optimize toTopicPartitions with ConsumerProtocolSubscription
Dongnuo Lyu created KAFKA-16673: --- Summary: Optimize toTopicPartitions with ConsumerProtocolSubscription Key: KAFKA-16673 URL: https://issues.apache.org/jira/browse/KAFKA-16673 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu https://github.com/apache/kafka/pull/15798#discussion_r1582981154 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
lianetm commented on code in PR #15844: URL: https://github.com/apache/kafka/pull/15844#discussion_r1591065112 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny(); if (dupe.isPresent() || inflight.isPresent()) { -log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions); -dupe.orElseGet(inflight::get).chainFuture(request.future); +log.info("Duplicate OffsetFetchRequest found for partitions: {}", request.requestedPartitions); +OffsetFetchRequestState originalRequest = dupe.orElseGet(inflight::get); +originalRequest.chainFuture(request.future); } else { this.unsentOffsetFetches.add(request); } return request.future; } +/** + * Remove the {@link OffsetFetchRequestState request} from the inflight requests queue iff + * both of the following are true: + * + * + * The request completed with a null {@link Throwable error} + * The request is not {@link OffsetFetchRequestState#isExpired expired} + * + * + * + * + * In some cases, even though an offset fetch request may complete without an error, technically + * the request took longer than the user's provided timeout. In that case, the application thread will + * still receive a timeout error, and will shortly try to fetch these offsets again. Keeping the result + * of the current attempt will enable the next attempt to use that result and return + * almost immediately. + */ +private void maybeRemoveInflightOffsetFetch(OffsetFetchRequestState fetchRequest, Throwable error) { +if (error == null && !fetchRequest.isExpired) { Review Comment: this line implies a big change in the current logic, that I wonder if we're taking too far. Agree with not removing the expired requests (that's the root cause of the problem we have), but why putting all errors (not only timeout) in the same bucket? With this new check, how are we ensuring that fetch requests that fail fatally are removed from the inflight queue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on PR #15861: URL: https://github.com/apache/kafka/pull/15861#issuecomment-2096037200 updated and `./gradlew clean core:test --tests ClusterTestExtensionsTest --tests ClusterConfigTest` passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1590935155 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -203,6 +189,22 @@ public ConsumerGroupMember build() { return member; } +private boolean ownsRevokedPartitions( Review Comment: nit: Should we add some javadoc? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Map; +import java.util.stream.Collectors; + +public class NoOpPartitionAssignor implements PartitionAssignor { +static final String NAME = "no-op"; +@Override Review Comment: nit: Let's add an empty line before this one. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -203,6 +189,22 @@ public ConsumerGroupMember build() { return member; } +private boolean ownsRevokedPartitions( +Map> assignment +) { +if (ownedTopicPartitions == null) return true; + +for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { +for (Integer partitionId : topicPartitions.partitions()) { +if (assignment.getOrDefault(topicPartitions.topicId(), Collections.emptySet()).contains(partitionId)) { Review Comment: nit: There is a small optimization here. We could do `assignment.getOrDefault(topicPartitions.topicId(), Collections.emptySet())` before looping on the partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] John confluent unclean recovery elect leader request [kafka]
mannoopj commented on PR #15876: URL: https://github.com/apache/kafka/pull/15876#issuecomment-2096008537 WIP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Correct connector scheduled rebalance logs [kafka]
yuz10 opened a new pull request, #15875: URL: https://github.com/apache/kafka/pull/15875 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Remove dev_version parameter from streams tests [kafka]
lucasbru opened a new pull request, #15874: URL: https://github.com/apache/kafka/pull/15874 In two streams tests, we are using the current snapshot version as a test parameter `to_version`, but as the only option. We can hardcode it. This simplifies testing downstream, since the test parameters do not change with every version. In particular, some tests downstream are blacklisted because they do not work with ARM. These lists need to be updated every time `DEV_VERSION` is bumped. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590943709 ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -44,4 +44,5 @@ String listener() default ""; MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0; ClusterConfigProperty[] serverProperties() default {}; +Tags[] tags() default {}; Review Comment: I feel key-value pairs are more flexible, users can choose what to add for display by the key, which makes it easy to format and understand. But I am also open to a string array idea, both works for me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590940053 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -83,6 +81,7 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke this.saslServerProperties = Objects.requireNonNull(saslServerProperties); this.saslClientProperties = Objects.requireNonNull(saslClientProperties); this.perBrokerOverrideProperties = Objects.requireNonNull(perBrokerOverrideProperties); +this.tags = Objects.requireNonNull(extendTags(tags)); Review Comment: I see, you mean except for the `nameTags`, also keep the origin tags and add a getter for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove `ConsumerGroupPartitionMetadataValue.Epoch` field [kafka]
dajac merged PR #15854: URL: https://github.com/apache/kafka/pull/15854 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1590911279 ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -108,12 +117,12 @@ public void testClusterTests() { } @ClusterTests({ -@ClusterTest(clusterType = Type.ZK), -@ClusterTest(clusterType = Type.ZK, disksPerBroker = 2), -@ClusterTest(clusterType = Type.KRAFT), -@ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2), -@ClusterTest(clusterType = Type.CO_KRAFT), -@ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2) +@ClusterTest(clusterType = Type.ZK), Review Comment: ditto ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -70,28 +79,28 @@ public void testClusterTest(ClusterInstance clusterInstance) { @ClusterTemplate("generate1") public void testClusterTemplate() { Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType(), -"generate1 provided a Zk cluster, so we should see that here"); +"generate1 provided a Zk cluster, so we should see that here"); Review Comment: ditto ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static java.util.Collections.singleton; + +class ConsumerGroupExecutor { Review Comment: Other tests (for example: #15872) need both this and `generator`. How about renaming this to `ConsumerGroupCommandTestUtils` and move the implementation of `generator` to it? ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -135,4 +144,49 @@ public void testNoAutoStart() { public void testDefaults(ClusterInstance clusterInstance) { Assertions.assertEquals(MetadataVersion.IBP_3_8_IV0, clusterInstance.config().metadataVersion()); } + +@ClusterTests({ +@ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), +}), +@ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), +}), +@ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), +@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), +}), +@ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), +@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), +}), +@ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), +@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), +}), +
[PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
nizhikov opened a new pull request, #15873: URL: https://github.com/apache/kafka/pull/15873 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16393) SslTransportLayer doesn't implement write(ByteBuffer[], int, int) correctly
[ https://issues.apache.org/jira/browse/KAFKA-16393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16393. Fix Version/s: 3.8.0 Resolution: Fixed > SslTransportLayer doesn't implement write(ByteBuffer[], int, int) correctly > --- > > Key: KAFKA-16393 > URL: https://issues.apache.org/jira/browse/KAFKA-16393 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > Fix For: 3.8.0 > > > As of Kafka 3.7.0, SslTransportLayer.write(ByteBuffer[], int, int) is > implemented like below: > {code:java} > public long write(ByteBuffer[] srcs, int offset, int length) throws > IOException { > ... > int i = offset; > while (i < length) { > if (srcs[i].hasRemaining() || hasPendingWrites()) { > > {code} > The loop index starts at `offset` and ends with `length`. > However this isn't correct because end-index should be `offset + length`. > Let's say we have the array of ByteBuffer with length = 5 and try calling > this method with offset = 3, length = 1. > In current code, `write(srcs, 3, 1)` doesn't attempt any write because the > loop condition is immediately false. > For now, seems this method is only called with args offset = 0, length = > srcs.length in Kafka code base so not causing any problem though, we should > fix this because this could introduce subtle bug if use this method with > different args in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]
chia7712 merged PR #15571: URL: https://github.com/apache/kafka/pull/15571 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]
chia7712 commented on code in PR #15872: URL: https://github.com/apache/kafka/pull/15872#discussion_r1590841207 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -88,7 +113,7 @@ public void testDeleteOffsetsNonExistingGroup() { } } -@ClusterTest +@ClusterTemplate("generator") public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { for (Map consumerConfig: consumerConfigs) { Review Comment: `consumerConfigs` need to be rewritten by `ClusterInstance#supportedGroupProtocols` see https://github.com/apache/kafka/pull/15766/files#r1590222192 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]
chia7712 commented on PR #15872: URL: https://github.com/apache/kafka/pull/15872#issuecomment-2095735857 Maybe we should merge `DeleteOffsetsConsumerGroupCommandIntegrationTest` and `DeleteConsumerGroupsTest`. They are used to test delete-related commands. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-8721: Kafka metrics improvements [kafka]
jsto commented on PR #7121: URL: https://github.com/apache/kafka/pull/7121#issuecomment-2095734292 Hello. We are also being flagged with metrics-core EOL. Do we have any update on this task? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: Why is it increased to 10 secs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: Why is it increased to 10 secs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [1/N] ConfigCommandTest rewritten in java [kafka]
chia7712 merged PR #15850: URL: https://github.com/apache/kafka/pull/15850 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org