Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1590222768 ## tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java: ## @@ -430,7 +430,7 @@ private void printStates(Map states) { String format = "\n%" + -coordinatorColLen + "s %-25s %-20s %-15s %s"; System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"); -System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers); +System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state.name(), state.numMembers); Review Comment: Sorry that we should use `toString()` instead of `name()` ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -145,4 +153,18 @@ default Admin createAdminClient() { void startBroker(int brokerId); void waitForReadyBrokers() throws InterruptedException; + +default Set supportedGroupProtocols() { Review Comment: Could you please add test for it? `ClusterTestExtensionsTest` is a goo place :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]
ocadaruma commented on PR #15571: URL: https://github.com/apache/kafka/pull/15571#issuecomment-2094651947 @chia7712 Thank you for pointing out. Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]
ocadaruma commented on code in PR #15571: URL: https://github.com/apache/kafka/pull/15571#discussion_r1590220877 ## clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java: ## @@ -1558,4 +1561,68 @@ public void testSSLEngineCloseInboundInvokedOnClose() throws IOException { verify(sslEngine, times(1)).closeInbound(); verifyNoMoreInteractions(sslEngine); } + +public void testGatheringWrite() throws IOException { Review Comment: Oh good catch. Let me fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]
chia7712 commented on code in PR #15571: URL: https://github.com/apache/kafka/pull/15571#discussion_r1590217930 ## clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java: ## @@ -1558,4 +1561,68 @@ public void testSSLEngineCloseInboundInvokedOnClose() throws IOException { verify(sslEngine, times(1)).closeInbound(); verifyNoMoreInteractions(sslEngine); } + +public void testGatheringWrite() throws IOException { Review Comment: please add `@Test` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]
chia7712 commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1590216139 ## tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java: ## @@ -536,7 +537,7 @@ private static ArgumentParser argParser() { .setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL) .metavar("GROUP_PROTOCOL") .dest("groupProtocol") -.help(String.format("Group protocol (must be one of %s)", Utils.join(GroupProtocol.values(), ", "))); +.help(String.format("Group protocol (must be one of %s)", String.join(", ", Arrays.toString(GroupProtocol.values(); Review Comment: `Arrays.stream(GroupProtocol.values()).map(Object::toString).collect(Collectors.joining(", "))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1590215434 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -145,4 +152,21 @@ default Admin createAdminClient() { void startBroker(int brokerId); void waitForReadyBrokers() throws InterruptedException; + +default Set supportedGroupProtocols() { Review Comment: It would be great to remove unmodified collection. Also, we need to add comments to explain the reason of checking two configs. ```java default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); // KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG if (config().serverProperties().getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || config().serverProperties().getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) { supportedGroupProtocols.add(CONSUMER); } return Collections.unmodifiableSet(supportedGroupProtocols); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2094642579 https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java#L433 @frankvicky Could you change `state.state` to `state.state.name()` for same output. Also, please fix the build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14785: Connect offset read REST API [kafka]
chia7712 commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1590212654 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ## @@ -141,24 +145,13 @@ private static String noClientId() { protected KafkaBasedLog offsetLog; // Visible for testing final HashMap data = new HashMap<>(); +private final Map>> connectorPartitions = new HashMap<>(); +private Converter keyConverter; private final Supplier topicAdminSupplier; private final Supplier clientIdBase; private SharedTopicAdmin ownTopicAdmin; protected boolean exactlyOnce; -/** - * Create an {@link OffsetBackingStore} backed by a Kafka topic. This constructor will cause the - * store to instantiate and close its own {@link TopicAdmin} during {@link #configure(WorkerConfig)} - * and {@link #stop()}, respectively. - * - * @deprecated use {@link #KafkaOffsetBackingStore(Supplier, Supplier)} instead - */ -@Deprecated -public KafkaOffsetBackingStore() { Review Comment: Sorry for making noise on this PR. out of curiosity, should we remove deprecated constructors from `KafkaStatusBackingStore` and `KafkaConfigBackingStore` too? not sure whether those internal classes need the deprecation cycle? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove redundant check in KafkaClusterTestKit [kafka]
chia7712 merged PR #15858: URL: https://github.com/apache/kafka/pull/15858 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15739: KRaft support in ResetConsumerGroupOffsetTest [kafka]
github-actions[bot] commented on PR #14686: URL: https://github.com/apache/kafka/pull/14686#issuecomment-2094567758 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
frankvicky commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1590198922 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,374 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterGenerator; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static kafka.test.annotation.Type.CO_KRAFT; +import static kafka.test.annotation.Type.KRAFT; +import static kafka.test.annotation.Type.ZK; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +@ExtendWith(value = ClusterTestExtensions.class) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; - -// note the group to be deleted is a dif
[jira] [Assigned] (KAFKA-16617) Add KRaft info for the `advertised.listeners` doc description
[ https://issues.apache.org/jira/browse/KAFKA-16617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Owen C.H. Leung reassigned KAFKA-16617: --- Assignee: Owen C.H. Leung > Add KRaft info for the `advertised.listeners` doc description > - > > Key: KAFKA-16617 > URL: https://issues.apache.org/jira/browse/KAFKA-16617 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Owen C.H. Leung >Priority: Major > Labels: newbie, newbie++ > > Currently, we only write ZK handler in the `advertised.listeners` doc > description: > > Listeners to publish to ZooKeeper for clients to use, if different than the > > listeners config property. > We should also add KRaft handler info in the doc > ref: > https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16617 [kafka]
Owen-CH-Leung opened a new pull request, #15860: URL: https://github.com/apache/kafka/pull/15860 As per [KAFKA-16617](https://issues.apache.org/jira/browse/KAFKA-16617), this PR revised the documentation for `advertised.listeners` to include kraft handler. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Add quorum state v1 [kafka]
jsancio closed pull request #15781: KAFKA-16526; Add quorum state v1 URL: https://github.com/apache/kafka/pull/15781 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio opened a new pull request, #15859: URL: https://github.com/apache/kafka/pull/15859 DRAFT ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16624) Don't generate useless PartitionChangeRecord on older MV
[ https://issues.apache.org/jira/browse/KAFKA-16624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16624. - Resolution: Fixed > Don't generate useless PartitionChangeRecord on older MV > > > Key: KAFKA-16624 > URL: https://issues.apache.org/jira/browse/KAFKA-16624 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Minor > > Fix a case where we could generate useless PartitionChangeRecords on metadata > versions older than 3.6-IV0. This could happen in the case where we had an > ISR with only one broker in it, and we were trying to go down to a fully > empty ISR. In this case, PartitionChangeBuilder would block the record to > going down to a fully empty ISR (since that is not valid in these pre-KIP-966 > metadata versions), but it would still emit the record, even though it had no > effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio merged PR #15671: URL: https://github.com/apache/kafka/pull/15671 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1590078150 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -690,6 +702,10 @@ class FetchSessionCache(private val maxEntries: Int, * 2. B is considered "stale" because it has been inactive for a long time, or * 3. A contains more partitions than B, and B is not recently created. * +* Prior to KAFKA-9401, the session cache was not sharded and we looked at all Review Comment: This docs is great. Could you please update this also? https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/KafkaConfig.scala#L182 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int, } } } +object FetchSessionCache { + private[server] val metricsGroup = new KafkaMetricsGroup(classOf[FetchSessionCache]) + private val counter = new AtomicLong(0) +} + +class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) { + // Set up metrics. + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => cacheShards.map(_.size).sum) + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => cacheShards.map(_.totalPartitions).sum) + + def getCacheShard(sessionId: Int): FetchSessionCacheShard = { +val shard = sessionId / cacheShards.head.sessionIdRange Review Comment: It assumes the `cacheShards` is sorted by the `shardNum`, right? If so, could you please add comments for it? ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int, } } } +object FetchSessionCache { + private[server] val metricsGroup = new KafkaMetricsGroup(classOf[FetchSessionCache]) + private val counter = new AtomicLong(0) +} + +class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) { + // Set up metrics. + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => cacheShards.map(_.size).sum) + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => cacheShards.map(_.totalPartitions).sum) + + def getCacheShard(sessionId: Int): FetchSessionCacheShard = { +val shard = sessionId / cacheShards.head.sessionIdRange +cacheShards(shard) + } + + // Returns the shard in round-robin + def getNextCacheShard: FetchSessionCacheShard = { +val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt Review Comment: As `int` is enough to this case, maybe we can use `AtomicInteger`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16668) Enable to set tags by `ClusterTest`
[ https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16668: -- Assignee: Johnny Hsu (was: Chia-Ping Tsai) > Enable to set tags by `ClusterTest` > > > Key: KAFKA-16668 > URL: https://issues.apache.org/jira/browse/KAFKA-16668 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Johnny Hsu >Priority: Minor > > Currently, the display name can be customized by only `name` > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42). > However, the "key" is hard-code to "name=xxx". Also, it is impossible to set > more "tags" for display name. > https://github.com/apache/kafka/pull/15766 is a example that we want to add > "xxx=bbb" to display name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16668) Enable to set tags by `ClusterTest`
[ https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843482#comment-17843482 ] Johnny Hsu commented on KAFKA-16668: hi [~chia7712] may I know if you are working on this? if not I am willing to help, thanks! > Enable to set tags by `ClusterTest` > > > Key: KAFKA-16668 > URL: https://issues.apache.org/jira/browse/KAFKA-16668 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Currently, the display name can be customized by only `name` > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42). > However, the "key" is hard-code to "name=xxx". Also, it is impossible to set > more "tags" for display name. > https://github.com/apache/kafka/pull/15766 is a example that we want to add > "xxx=bbb" to display name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16668) Enable to set tags by `ClusterTest`
[ https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843481#comment-17843481 ] Chia-Ping Tsai commented on KAFKA-16668: Personally, introducing a new annotation "Tag" is a solution. "Tag[] tags()" > Enable to set tags by `ClusterTest` > > > Key: KAFKA-16668 > URL: https://issues.apache.org/jira/browse/KAFKA-16668 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Currently, the display name can be customized by only `name` > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42). > However, the "key" is hard-code to "name=xxx". Also, it is impossible to set > more "tags" for display name. > https://github.com/apache/kafka/pull/15766 is a example that we want to add > "xxx=bbb" to display name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1590061307 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,381 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterGenerator; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static kafka.test.annotation.Type.CO_KRAFT; +import static kafka.test.annotation.Type.KRAFT; +import static kafka.test.annotation.Type.ZK; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +@ExtendWith(value = ClusterTestExtensions.class) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private final Iterable groupProtocols; -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +this.groupProtocols = cluster.config().serverProperti
[jira] [Created] (KAFKA-16668) Enable to set tags by `ClusterTest`
Chia-Ping Tsai created KAFKA-16668: -- Summary: Enable to set tags by `ClusterTest` Key: KAFKA-16668 URL: https://issues.apache.org/jira/browse/KAFKA-16668 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Currently, the display name can be customized by only `name` (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42). However, the "key" is hard-code to "name=xxx". Also, it is impossible to set more "tags" for display name. https://github.com/apache/kafka/pull/15766 is a example that we want to add "xxx=bbb" to display name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16667) KRaftMigrationDriver gets stuck after successive failovers
[ https://issues.apache.org/jira/browse/KAFKA-16667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16667: - Description: This is a continuation of KAFKA-16171. It turns out that the active KRaftMigrationDriver can get a stale read from ZK after becoming the active controller in ZK (i.e., writing to "/controller"). Because ZooKeeper only offers linearizability on writes to a given ZNode, it is possible that we get a stale read on the "/migration" ZNode after writing to "/controller" (and "/controller_epoch") when becoming active. The history looks like this: # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on all KRaftMigrationDriver # Node A writes some state to ZK, updates "/migration", and checks "/controller_epoch" in one transaction. This happens before B claims controller leadership in ZK. The "/migration" state is updated from X to Y # Node B claims leadership by updating "/controller" and "/controller_epoch". Leader B reads "/migration" state X # Node A tries to write some state, fails on "/controller_epoch" check op. # Node A processes new leader and becomes inactive This does not violate consistency guarantees made by ZooKeeper. > Write operations in ZooKeeper are {_}linearizable{_}. In other words, each > {{write}} will appear to take effect atomically at some point between when > the client issues the request and receives the corresponding response. and > Read operations in ZooKeeper are _not linearizable_ since they can return > potentially stale data. This is because a {{read}} in ZooKeeper is not a > quorum operation and a server will respond immediately to a client that is > performing a {{{}read{}}}. --- The impact of this stale read is the same as KAFKA-16171. The KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale zkVersion for the "/migration" ZNode. The result is brokers never learn about the new controller and cannot update any partition state. The workaround for this bug is to re-elect the controller by shutting down the active KRaft controller. This bug was found during a migration where the KRaft controller was rapidly failing over due to an excess of metadata. was: This is a continuation of KAFKA-16171. It turns out that the active KRaftMigrationDriver can get a stale read from ZK after becoming the active controller in ZK (i.e., writing to "/controller"). Because ZooKeeper only offers linearizability on writes to a given ZNode, it is possible that we get a stale read on the "/migration" ZNode after writing to "/controller" (and "/controller_epoch") when becoming active. The history looks like this: # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on all KRaftMigrationDriver-s # Node A writes some state to ZK, updates "/migration", and checks "/controller_epoch" in one transaction. This happens before B claims controller leadership in ZK. The "/migration" state is updated from X to Y # Node B claims leadership by updating "/controller" and "/controller_epoch". Leader B reads "/migration" state X # Node A tries to write some state, fails on "/controller_epoch" check op. # Node A processes new leader and becomes inactive This does not violate consistency guarantees made by ZooKeeper. > Write operations in ZooKeeper are {_}linearizable{_}. In other words, each > {{write}} will appear to take effect atomically at some point between when > the client issues the request and receives the corresponding response. and > Read operations in ZooKeeper are _not linearizable_ since they can return > potentially stale data. This is because a {{read}} in ZooKeeper is not a > quorum operation and a server will respond immediately to a client that is > performing a {{{}read{}}}. --- The impact of this stale read is the same as KAFKA-16171. The KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale zkVersion for the "/migration" ZNode. The result is brokers never learn about the new controller and cannot update any partition state. The workaround for this bug is to re-elect the controller by shutting down the active KRaft controller. This bug was found during a migration where the KRaft controller was rapidly failing over due to an excess of metadata. > KRaftMigrationDriver gets stuck after successive failovers > -- > > Key: KAFKA-16667 > URL: https://issues.apache.org/jira/browse/KAFKA-16667 > Project: Kafka > Issue Type: Bug > Components: controller, migration >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > > This is a continuation of KAFKA-16171. > It turns out that the active KRaftMigrationDriver can get a stale read from > ZK af
[jira] [Assigned] (KAFKA-16667) KRaftMigrationDriver gets stuck after successive failovers
[ https://issues.apache.org/jira/browse/KAFKA-16667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-16667: Assignee: David Arthur > KRaftMigrationDriver gets stuck after successive failovers > -- > > Key: KAFKA-16667 > URL: https://issues.apache.org/jira/browse/KAFKA-16667 > Project: Kafka > Issue Type: Bug > Components: controller, migration >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > > This is a continuation of KAFKA-16171. > It turns out that the active KRaftMigrationDriver can get a stale read from > ZK after becoming the active controller in ZK (i.e., writing to > "/controller"). > Because ZooKeeper only offers linearizability on writes to a given ZNode, it > is possible that we get a stale read on the "/migration" ZNode after writing > to "/controller" (and "/controller_epoch") when becoming active. > > The history looks like this: > # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on > all KRaftMigrationDriver-s > # Node A writes some state to ZK, updates "/migration", and checks > "/controller_epoch" in one transaction. This happens before B claims > controller leadership in ZK. The "/migration" state is updated from X to Y > # Node B claims leadership by updating "/controller" and > "/controller_epoch". Leader B reads "/migration" state X > # Node A tries to write some state, fails on "/controller_epoch" check op. > # Node A processes new leader and becomes inactive > > This does not violate consistency guarantees made by ZooKeeper. > > > Write operations in ZooKeeper are {_}linearizable{_}. In other words, each > > {{write}} will appear to take effect atomically at some point between when > > the client issues the request and receives the corresponding response. > and > > Read operations in ZooKeeper are _not linearizable_ since they can return > > potentially stale data. This is because a {{read}} in ZooKeeper is not a > > quorum operation and a server will respond immediately to a client that is > > performing a {{{}read{}}}. > > --- > > The impact of this stale read is the same as KAFKA-16171. The > KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale > zkVersion for the "/migration" ZNode. The result is brokers never learn about > the new controller and cannot update any partition state. > The workaround for this bug is to re-elect the controller by shutting down > the active KRaft controller. > This bug was found during a migration where the KRaft controller was rapidly > failing over due to an excess of metadata. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16667) KRaftMigrationDriver gets stuck after successive failovers
David Arthur created KAFKA-16667: Summary: KRaftMigrationDriver gets stuck after successive failovers Key: KAFKA-16667 URL: https://issues.apache.org/jira/browse/KAFKA-16667 Project: Kafka Issue Type: Bug Components: controller, migration Reporter: David Arthur This is a continuation of KAFKA-16171. It turns out that the active KRaftMigrationDriver can get a stale read from ZK after becoming the active controller in ZK (i.e., writing to "/controller"). Because ZooKeeper only offers linearizability on writes to a given ZNode, it is possible that we get a stale read on the "/migration" ZNode after writing to "/controller" (and "/controller_epoch") when becoming active. The history looks like this: # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on all KRaftMigrationDriver-s # Node A writes some state to ZK, updates "/migration", and checks "/controller_epoch" in one transaction. This happens before B claims controller leadership in ZK. The "/migration" state is updated from X to Y # Node B claims leadership by updating "/controller" and "/controller_epoch". Leader B reads "/migration" state X # Node A tries to write some state, fails on "/controller_epoch" check op. # Node A processes new leader and becomes inactive This does not violate consistency guarantees made by ZooKeeper. > Write operations in ZooKeeper are {_}linearizable{_}. In other words, each > {{write}} will appear to take effect atomically at some point between when > the client issues the request and receives the corresponding response. and > Read operations in ZooKeeper are _not linearizable_ since they can return > potentially stale data. This is because a {{read}} in ZooKeeper is not a > quorum operation and a server will respond immediately to a client that is > performing a {{{}read{}}}. --- The impact of this stale read is the same as KAFKA-16171. The KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale zkVersion for the "/migration" ZNode. The result is brokers never learn about the new controller and cannot update any partition state. The workaround for this bug is to re-elect the controller by shutting down the active KRaft controller. This bug was found during a migration where the KRaft controller was rapidly failing over due to an excess of metadata. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2094325675 @frankvicky Please avoid "force push". It can eliminate the conversions ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
frankvicky commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1589996273 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,381 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterGenerator; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static kafka.test.annotation.Type.CO_KRAFT; +import static kafka.test.annotation.Type.KRAFT; +import static kafka.test.annotation.Type.ZK; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +@ExtendWith(value = ClusterTestExtensions.class) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private final Iterable groupProtocols; -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +this.groupProtocols = cluster.config().serverProper
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
frankvicky commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1589995300 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,381 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterGenerator; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static kafka.test.annotation.Type.CO_KRAFT; +import static kafka.test.annotation.Type.KRAFT; +import static kafka.test.annotation.Type.ZK; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +@ExtendWith(value = ClusterTestExtensions.class) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private final Iterable groupProtocols; -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +this.groupProtocols = cluster.config().serverProper
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
AndrewJSchofield commented on code in PR #15844: URL: https://github.com/apache/kafka/pull/15844#discussion_r1589983081 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny(); if (dupe.isPresent() || inflight.isPresent()) { -log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions); -dupe.orElseGet(inflight::get).chainFuture(request.future); +log.info("Duplicate OffsetFetchRequest found for partitions: {}", request.requestedPartitions); +OffsetFetchRequestState originalRequest = dupe.orElseGet(inflight::get); +originalRequest.chainFuture(request.future); } else { this.unsentOffsetFetches.add(request); } return request.future; } +/** + * Remove the {@link OffsetFetchRequestState request} from the inflight requests queue iff + * both of the following are true: + * + * + * The request completed with a null {@link Throwable error} + * The request is not {@link OffsetFetchRequestState#isExpired expired} + * + * + * + * + * In some cases, even though an offset fetch request may complete without an error, technically + * the request took longer than the user's provided timeout. In that case, the application thread will + * still receive a timeout error, and will shortly try to fetch these offsets again. Keeping the result + * of the current attempt will enable the next attempt to use that result and return Review Comment: I think the emphasis might have got a bit mismatched here. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny(); if (dupe.isPresent() || inflight.isPresent()) { -log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions); -dupe.orElseGet(inflight::get).chainFuture(request.future); +log.info("Duplicate OffsetFetchRequest found for partitions: {}", request.requestedPartitions); +OffsetFetchRequestState originalRequest = dupe.orElseGet(inflight::get); +originalRequest.chainFuture(request.future); } else { this.unsentOffsetFetches.add(request); } return request.future; } +/** + * Remove the {@link OffsetFetchRequestState request} from the inflight requests queue iff + * both of the following are true: + * + * + * The request completed with a null {@link Throwable error} Review Comment: Isn't this more succinctly described as "The request completed without an error"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14579) Move DumpLogSegments to tools
[ https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843422#comment-17843422 ] Chia-Ping Tsai commented on KAFKA-14579: {quote} currently only DumpLogSegments is using Decoder, if it's removed then Decoder should be safe to be deprecated since no one will be using that anymore {quote} According to Kafka compatibility rule, command line tools belong to public interfaces. Hence, we need a replacement for `kafka.serializer.Decoder` and then deprecate `kafka.serializer.Decoder`. Also, the replacement should be written by Java and put into tools-api module. > Move DumpLogSegments to tools > - > > Key: KAFKA-14579 > URL: https://issues.apache.org/jira/browse/KAFKA-14579 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Alexandre Dupriez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14579) Move DumpLogSegments to tools
[ https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843421#comment-17843421 ] Johnny Hsu edited comment on KAFKA-14579 at 5/4/24 10:36 AM: - currently only DumpLogSegments is using Decoder, if it's removed then Decoder should be safe to be deprecated since no one will be using that anymore. I am willing to work on the KIP for this was (Author: JIRAUSER304478): currently only DumpLogSegments is using Decoder, if it's removed then Decoder should be safe to be deprecated since no one will be using that anymore. I am willing to work on the KIP for this :) > Move DumpLogSegments to tools > - > > Key: KAFKA-14579 > URL: https://issues.apache.org/jira/browse/KAFKA-14579 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Alexandre Dupriez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14579) Move DumpLogSegments to tools
[ https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843421#comment-17843421 ] Johnny Hsu edited comment on KAFKA-14579 at 5/4/24 10:36 AM: - currently only DumpLogSegments is using Decoder, if it's removed then Decoder should be safe to be deprecated since no one will be using that anymore. I am willing to work on the KIP for this :) was (Author: JIRAUSER304478): currently only DumpLogSegments is using Decoder, if it's removed then Decoder should be safe to be deprecated since no one will be using that anymore > Move DumpLogSegments to tools > - > > Key: KAFKA-14579 > URL: https://issues.apache.org/jira/browse/KAFKA-14579 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Alexandre Dupriez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589956639 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange + val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards + val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards) +.map(shardNum => new FetchSessionCacheShard( + config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards, Review Comment: That's a great point and it's quite subtle. I reckon this may happen because the cacheShards are picked randomly and it can be avoided by picking shards in round-robin. I'll make this change along with addressing the other comments 👍 Some subtle differences cannot be avoided, particularly around eviction. The [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability#KIP227:IntroduceIncrementalFetchRequeststoIncreasePartitionScalability-FetchSessionCaching) considers all existing sessions when considering a session for eviction while this change would consider only existing sessions **within** a shard for eviction. I'll update the documentation to call out the difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14579) Move DumpLogSegments to tools
[ https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843421#comment-17843421 ] Johnny Hsu commented on KAFKA-14579: currently only DumpLogSegments is using Decoder, if it's removed then Decoder should be safe to be deprecated since no one will be using that anymore > Move DumpLogSegments to tools > - > > Key: KAFKA-14579 > URL: https://issues.apache.org/jira/browse/KAFKA-14579 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Alexandre Dupriez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1589953917 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ## @@ -173,21 +172,15 @@ protected Serde prepareValueSerdeForStore(final Serde valueSerde, final Se protected void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); -serdes = new StateSerdes<>( -changelogTopic, -prepareKeySerde(keySerde, new SerdeGetter(context)), -prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) -); +serdes = StoreSerdeInitializer.prepareStoreSerde( +context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); } protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); -serdes = new StateSerdes<>( -changelogTopic, -prepareKeySerde(keySerde, new SerdeGetter(context)), -prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) -); +serdes = StoreSerdeInitializer.prepareStoreSerde( +context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); Review Comment: I added a parameter function prepareValueSerde to be able to use the correct function in children TimestampedStore classes. These don't directly use `WrappingNullableUtils.prepareValueSerde` by overriding the behavior of prepareValueSerdeForStore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: remove redundant check in KafkaClusterTestKit [kafka]
johnnychhsu opened a new pull request, #15858: URL: https://github.com/apache/kafka/pull/15858 ## Context This check is always false, and in `ControllerServer` it cleanup itself if exception happens. Thus, we don't need this check and cleanup here. ## Solution Remove this check ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1589952122 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,14 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize serdes for sink node %s", name()), e); Review Comment: Good idea, I moved the exception handling in a separate class to share the code between the store classes, with corresponding unit tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12317: fix documentation [kafka]
florin-akermann commented on PR #15689: URL: https://github.com/apache/kafka/pull/15689#issuecomment-2094077287 Thanks @AyoubOm & @mjsax > I agree with @AyoubOm that both statements you intend to remove are still correct? Indeed, yes. > It seems https://github.com/apache/kafka/pull/14107 remove it incorrectly for inner join instead of left join? Removed from left join and added old statement again to inner join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16174) Flaky test: testDescribeQuorumStatusSuccessful – org.apache.kafka.tools.MetadataQuorumCommandTest
[ https://issues.apache.org/jira/browse/KAFKA-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843403#comment-17843403 ] Johnny Hsu edited comment on KAFKA-16174 at 5/4/24 8:21 AM: the exception is from [https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/BrokerServer.scala#L474] when the cluster starts, [https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L426] tries to init broker, but it failed to get the response from controller was (Author: JIRAUSER304478): the exception is from [https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/BrokerServer.scala#L474] when the cluster starts, [https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L426] tries to init broker, but it failed. > Flaky test: testDescribeQuorumStatusSuccessful – > org.apache.kafka.tools.MetadataQuorumCommandTest > - > > Key: KAFKA-16174 > URL: https://issues.apache.org/jira/browse/KAFKA-16174 > Project: Kafka > Issue Type: Test >Reporter: Apoorv Mittal >Assignee: Johnny Hsu >Priority: Major > Labels: flaky-test > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15190/3/tests/] > > {code:java} > Errorjava.util.concurrent.ExecutionException: java.lang.RuntimeException: > Received a fatal error while waiting for the controller to acknowledge that > we are caught upStacktracejava.util.concurrent.ExecutionException: > java.lang.RuntimeException: Received a fatal error while waiting for the > controller to acknowledge that we are caught up at > java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:421) > at > kafka.test.junit.RaftClusterInvocationContext.lambda$getAdditionalExtensions$5(RaftClusterInvocationContext.java:116) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeTestExecutionCallbacks$5(TestMethodTestDescriptor.java:192) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:203) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:203) >at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeTestExecutionCallbacks(TestMethodTestDescriptor.java:191) >at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) >at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) >at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) >at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) >at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) >at > org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16174) Flaky test: testDescribeQuorumStatusSuccessful – org.apache.kafka.tools.MetadataQuorumCommandTest
[ https://issues.apache.org/jira/browse/KAFKA-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843403#comment-17843403 ] Johnny Hsu commented on KAFKA-16174: the exception is from [https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/BrokerServer.scala#L474] when the cluster starts, [https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L426] tries to init broker, but it failed. > Flaky test: testDescribeQuorumStatusSuccessful – > org.apache.kafka.tools.MetadataQuorumCommandTest > - > > Key: KAFKA-16174 > URL: https://issues.apache.org/jira/browse/KAFKA-16174 > Project: Kafka > Issue Type: Test >Reporter: Apoorv Mittal >Assignee: Johnny Hsu >Priority: Major > Labels: flaky-test > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15190/3/tests/] > > {code:java} > Errorjava.util.concurrent.ExecutionException: java.lang.RuntimeException: > Received a fatal error while waiting for the controller to acknowledge that > we are caught upStacktracejava.util.concurrent.ExecutionException: > java.lang.RuntimeException: Received a fatal error while waiting for the > controller to acknowledge that we are caught up at > java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:421) > at > kafka.test.junit.RaftClusterInvocationContext.lambda$getAdditionalExtensions$5(RaftClusterInvocationContext.java:116) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeTestExecutionCallbacks$5(TestMethodTestDescriptor.java:192) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:203) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:203) >at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeTestExecutionCallbacks(TestMethodTestDescriptor.java:191) >at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) >at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) >at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) >at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) >at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) >at > org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]
showuon commented on code in PR #15817: URL: https://github.com/apache/kafka/pull/15817#discussion_r1589925627 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1245,19 +1260,27 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata return false; } -// Segment's first epoch's offset should be more than or equal to the respective leader epoch's offset. -if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) { -LOGGER.debug("Segment {} first epoch {} offset is less than leader epoch offset {}.", -segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)); +// Two cases: +// case-1: When the segment-first-epoch equals to the first-epoch in the leader-epoch-lineage, then the +// offset value can lie anywhere between 0 to (next-epoch-start-offset - 1) is valid. +// case-2: When the segment-first-epoch is not equal to the first-epoch in the leader-epoch-lineage, then +// the offset value should be between (current-epoch-start-offset) to (next-epoch-start-offset - 1). +if (epoch == segmentFirstEpoch && leaderEpochs.lowerKey(epoch) != null && offset < leaderEpochs.get(epoch)) { +LOGGER.debug("Segment {} first-valid epoch {} offset is less than leader epoch offset {}." + Review Comment: nit: is less than "first" leader epoch offset... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
florin-akermann commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) { @Override public void process(final Record> record) { +// clear cashed hash from previous record +recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) if (useVersionedSemantics && !record.value().isLatest) { LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); Review Comment: @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in https://issues.apache.org/jira/browse/KAFKA-16394 already? Else i'll adress it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Window not honored? Why is X record not dropped? [kafka]
florin-akermann closed pull request #15314: Window not honored? Why is X record not dropped? URL: https://github.com/apache/kafka/pull/15314 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
florin-akermann commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) { @Override public void process(final Record> record) { +// clear cashed hash from previous record +recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) if (useVersionedSemantics && !record.value().isLatest) { LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); Review Comment: @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in https://issues.apache.org/jira/browse/KAFKA-16394 already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]
chia7712 commented on code in PR #15853: URL: https://github.com/apache/kafka/pull/15853#discussion_r1589913151 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1703,7 +1704,9 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { new FetchCommittedOffsetsEvent( initializingPartitions, timer); +wakeupTrigger.setActiveTask(event.future()); final Map offsets = applicationEventHandler.addAndGet(event, timer); +wakeupTrigger.clearTask(); Review Comment: Please use try-finally ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -872,4 +873,38 @@ class PlaintextConsumerTest extends BaseConsumerTest { // local metadata. However, it should give up after the user-supplied timeout has past. assertThrows(classOf[TimeoutException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit = { +val topicPartition = new TopicPartition(topic, 15) +val consumer = createConsumer() +consumer.assign(List(topicPartition).asJava) + +CompletableFuture.runAsync { () => + TimeUnit.SECONDS.sleep(1) + consumer.wakeup() +} + +assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionWithErrorConnectionRespectsWakeup(quorum: String, groupProtocol: String): Unit = { +val topicPartition = new TopicPartition(topic, 15) +val properties = new Properties() +properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345") Review Comment: Please add comments to say this connection in un-connectable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org