Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]
chia7712 commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1585914471 ## clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java: ## @@ -25,21 +25,21 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; +import java.util.LinkedHashMap; Review Comment: Could you reduce the changes of imports? Also, the new order is weird to me as it is not a lexicographical order -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]
chia7712 commented on code in PR #15838: URL: https://github.com/apache/kafka/pull/15838#discussion_r1585911858 ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -1541,6 +1541,36 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testReduceNumNetworkThreads(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(1). +setNumControllerNodes(1).build()). + setConfigProp(KafkaConfig.NumNetworkThreadsProp, "4"). + build() +try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.clientProperties()) + try { +admin.incrementalAlterConfigs( + Collections.singletonMap(new ConfigResource(Type.BROKER, ""), +Collections.singletonList(new AlterConfigOp( + new ConfigEntry(KafkaConfig.NumNetworkThreadsProp, "2"), OpType.SET.all().get() +val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 1.toShort)) +val createTopicResult = admin.createTopics(newTopic) +createTopicResult.all().get() Review Comment: It seems reducing the network threads causes the disconnection and it produces the retry of request forward. Hence, that can make this test unstable. Maybe we can increase the network threads instead? ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -303,17 +303,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(controller.socketServer) } - def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { + def addReconfigurable(reconfigurable: Reconfigurable): Unit = { verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala) Review Comment: It seems `verifyReconfigurableConfigs` does not need lock also, since it just check the input strings. ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -1541,6 +1541,36 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testReduceNumNetworkThreads(): Unit = { Review Comment: Pardon me, is this test case related to deadlock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]
mjsax commented on PR #15607: URL: https://github.com/apache/kafka/pull/15607#issuecomment-2088000194 Can we update existing test to not use `Map` but `List` (as I mentioned on the duplicate ticket https://issues.apache.org/jira/browse/KAFKA-16644) which should expose the bug? Need to think about the fix a little more (need to refresh my memory on the different "subscription types"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16644. - Resolution: Duplicate > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KIP-759 Mark as Partitioned [kafka]
mjsax commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1585886754 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. Review Comment: ```suggestion * and does not require further repartitioning by downstream key depedent operations. ``` ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final KeyValueMapper selectKeyProcessorNode = internalSelectKey(mapper, new NamedInternal(named)); -selectKeyProcessorNode.keyChangingOperation(true); +selectKeyProcessorNode.keyChangingOperation(repartitionRequired); builder.addGraphNode(graphNode, selectKeyProcessorNode); // key serde cannot be preserved return new KStreamImpl<>( -selectKeyProcessorNode.nodeName(), -null, -valueSerde, -subTopologySourceNodes, -true, -selectKeyProcessorNode, -builder); +selectKeyProcessorNode.nodeName(), Review Comment: nit: avoid unnecessary reformatting (ie, indention in this case) -- I assume you have some "auto format" feature enabled in your IDE. I would recommend to disable it, or adjust the setting to avoid noise like this. ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * + * This method will overwrite a default behavior as described below. + * By default, Kafka Streams always automatically repartition the records to prepare for a stateful operation, + * however, it is not always required when input stream is partitioned as intended. As an example, + * if an input stream is partitioned by a String key1, calling the below function will trigger a repartition: + * + * {@code + * KStream inputStream = builder.stream("topic"); + * stream + * .selectKey( ... => (key1, metric)) + * .groupByKey() + * .aggregate() + * } + * + * You can then overwrite the default behavior by calling this method: + * {@code + * stream + * .selectKey( ... => (key1, metric)) + * .markAsPartitioned() + * .groupByKey() + * .aggregate() + * } + * Review Comment: Do we need this tag? ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. Review Comment: Can you refresh my memory about joins? I cannot remember the details. We should add a section to the `docs/streams/developer-guide/dsl-api.html` and explain the "do" and "donts" of this operation. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final
[jira] [Assigned] (KAFKA-16650) add integration test for Admin#abortTransaction
[ https://issues.apache.org/jira/browse/KAFKA-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16650: -- Assignee: Kuan Po Tseng (was: Chia-Ping Tsai) > add integration test for Admin#abortTransaction > --- > > Key: KAFKA-16650 > URL: https://issues.apache.org/jira/browse/KAFKA-16650 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Minor > > It seems there are only few unit tests. We should add IT includeing zk, > kraft, and new group coordinator for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16650) add integration test for Admin#abortTransaction
[ https://issues.apache.org/jira/browse/KAFKA-16650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842542#comment-17842542 ] Kuan Po Tseng commented on KAFKA-16650: --- May I take over this issue ? :) > add integration test for Admin#abortTransaction > --- > > Key: KAFKA-16650 > URL: https://issues.apache.org/jira/browse/KAFKA-16650 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > It seems there are only few unit tests. We should add IT includeing zk, > kraft, and new group coordinator for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
AndrewJSchofield commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1585852808 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { } } +/** + * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} + * causes {@link InterruptException} to be thrown. + */ +@Test +public void testPollThrowsInterruptExceptionIfInterrupted() { +consumer = newConsumer(); +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +consumer.assign(singleton(tp)); + +// interrupt the thread and call poll +try { +Thread.currentThread().interrupt(); +assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); +} finally { +// clear interrupted state again since this thread may be reused by JUnit Review Comment: By calling `Thread.interrupted()`, the code is ensuring that the test does not exit with the thread still in an interrupted state. I have updated the comment accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842538#comment-17842538 ] Chia-Ping Tsai commented on KAFKA-16223: [~cmukka20] Could we take over remaining tasks? KafkaConfigBackingStoreTest is the last one which use PowerMock. It is time to close this migration party :) > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership
[ https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842537#comment-17842537 ] Johnny Hsu commented on KAFKA-16027: hey [~alexanderaghili] may I know if we have any updates on this? I am happy to help if you are busy with something else :) > Refactor MetadataTest#testUpdatePartitionLeadership > --- > > Key: KAFKA-16027 > URL: https://issues.apache.org/jira/browse/KAFKA-16027 > Project: Kafka > Issue Type: Improvement >Reporter: Philip Nee >Assignee: Alexander Aghili >Priority: Minor > Labels: newbie > > MetadataTest#testUpdatePartitionLeadership is extremely long. I think it is > pretty close to the 160 line method limit - I tried to modfity it but it > would hit the limit when i tried to break things into separated lines. > The test also contains two tests, so it is best to split it into two separate > tests. > We should also move this to ConsumerMetadata.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
FrankYang0529 commented on PR #15745: URL: https://github.com/apache/kafka/pull/15745#issuecomment-2087904101 Hi @chia7712, thanks for the review. I address all comments and add some test cases for ClusterConfig. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1580155319 ## clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java: ## @@ -44,11 +44,15 @@ public enum ControlRecordType { ABORT((short) 0), COMMIT((short) 1), -// Raft quorum related control messages. +// KRaft quorum related control messages LEADER_CHANGE((short) 2), SNAPSHOT_HEADER((short) 3), SNAPSHOT_FOOTER((short) 4), +// KRaft membership changes messages +KRAFT_VERSION((short) 5), +VOTERS((short) 6), Review Comment: Sounds good. Fixed for KRAFT_VOTERS. I'll fix the rest in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842530#comment-17842530 ] Kirk True commented on KAFKA-16637: --- [~chickenchickenlove]—thanks for filing this. There are two existing improvements (KAFKA-15974 and KAFKA-16200) that fix timing issues in the new consumer. However, even when testing your case on a temporary branch that includes fixes for both of those issues, the problem still showed up. This issue is related to an optimization for offset fetch logic. When a user calls {{Consumer.poll()}}, among other things, the consumer performs a network request to fetch any previously-committed offsets so it can determine from where to start fetching new records. When the user passes in a timeout of zero, it's almost always the case that the offset fetch network request will not be performed within 0 milliseconds. However, the consumer still sends out the request and handles the response when it is received, usually a few milliseconds later. In this first attempt, the lookup fails and the {{poll()}} loops back around. Given that this timeout is the common case, the consumer caches the offset fetch response/result from the first attempt (even though it timed out) because it knows that the _next_ call to {{poll()}} is going to attempt the exact same operation. When it is later attempted a second time, the response is already there from the first attempt such that the consumer doesn't need to perform a network request. The existing consumer has implemented this caching in [PendingCommittedOffsetRequest|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L132]. The new consumer has implemented it in [CommitRequestManager|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L510]. The core issue is the new consumer implementation is clearing out the first attempt's cached result too aggressively. The effect being that the second (and subsequent) attempts fail to find any previous attempt's cached result, and all submit network requests, which all fail. Thus the consumer never makes any headway. > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1585713585 ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: Restored this print for now. For me, this print together with checking for the output of this print inside the test, causes kraft tests to be flaky. E.g. ``` ./gradlew core:test --tests "kafka.admin.AclCommandTest.testAclCliWithAdminAPI" --rerun ``` it always passes for `zk`, but quite often fails for `kraft`. As I understand, `listAcls()` can be served by any broker in kraft mode, not just by controller. So potentially some brokers may not have most up-to-date copy of metadata right after `createAcls()` call. Here there is no interval between `createAcls()` and `listAcls()`, so probability of such race condition is higher. From [KIP-500](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500) and the [diagram](https://cwiki.apache.org/confluence/download/attachments/123898922/b.png) it seems what brokers have to periodically pull new metadata from the kraft controller, so metadata is not available on all brokers immediately. Please correct me if I'm wrong here. I've looked at `TopicCommand`, as another example of the class which manages metadata. It seems what it doesn't invoke `listTopics()` after creating or deleting topics. So I thought we can use similar approach here. I tried to reproduce this flaky behavior with running Kafka controller and broker locally with `bin/kafka-server-start.sh`, and invoking `bin/kafka-acls.sh` manually, but wasn't able to reproduce same issue. Probably, it can be related to the test infrastructure setup. I'll try to re-write the test to java and use new test infrastructure, let's see it solves the issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1585704666 ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: Restored this print for now. For me, this print together with checking for the output of this print inside the test, causes kraft tests to be flaky. E.g. ``` ./gradlew core:test --tests "kafka.admin.AclCommandTest.testAclCliWithAdminAPI" --rerun ``` it always passes for `zk`, but quite often fails for `kraft`. As I understand, `listAcls()` can be served by any broker in kraft mode, not just by controller. So potentially some brokers may not have most up-to-date copy of metadata right after `createAcls()` call. Here there is no interval between `createAcls()` and `listAcls()`, so probability of such race condition is higher. From [KIP-500](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500) and the [diagram](https://cwiki.apache.org/confluence/download/attachments/123898922/b.png) it seems what brokers have to periodically pull new metadata from the kraft controller, so metadata is not available on all brokers immediately. Please correct me if I'm wrong here. I've looked at `TopicCommand`, as another example of the class which manages metadata. It seems what it doesn't invoke `listTopics()` after creating or deleting topics. So I thought we can use similar approach here. I tried to reproduce this flaky behavior with running Kafka controller and broker locally with `bin/kafka-server-start.sh`, and invoking `bin/kafka-acls.sh` manually, but wasn't able to reproduce same issue. Probably, it can be related to the test infrastructure setup. I'll try to re-write the test to java and use new test infrastructure, let's see it solves the issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik commented on PR #15830: URL: https://github.com/apache/kafka/pull/15830#issuecomment-2087745006 > Hi @pasharik. Thanks for the change. > > > In the original implementation, listAcls() method was called directly from addAcls() and removeAcls() methods, which caused a race condition in KRaft mode, so the test become flaky > > Can you share a bit more detail on this? How specifically does `listAcls()` contribute to flakiness? Thanks Replied in the [above thread](https://github.com/apache/kafka/pull/15830#discussion_r1584190323) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1585704666 ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: Restored this print for now. For me, this print together with checking for the output of this print inside the test, causes kraft tests to be flaky. E.g. ``` ./gradlew core:test --tests "kafka.admin.AclCommandTest.testAclCliWithAdminAPI" --rerun ``` it always passes for `zk`, but quite often fails for `kraft`. As I understand, `listAcls()` can be served by any broker in kraft mode, not just by controller. So potentially some brokers may not have most up-to-date copy of metadata right after `createAcls()` call. Here there is no interval between `createAcls()` and `listAcls()`, so probability of such race condition is higher. From [KIP-500](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500) and the [diagram](https://cwiki.apache.org/confluence/download/attachments/123898922/b.png) it seems what brokers have to periodically pull new metadata from the kraft controller, so metadata is not available on all brokers immediately. Please correct me if I'm wrong here. I've looked at `TopicCommand`, as another example of the class which manages metadata. It seems what it doesn't invoke `listTopics()` after creating or deleting topics. So I thought we can use similar approach here. I tried to reproduce this flaky behavior with running Kafka controller and broker locally with `bin/kafka-server-start.sh`, and invoking `bin/kafka-acls.sh` manually, but wasn't able to reproduce same issue. Probably, it can be related to the test infrastructure setup. I'll try to re-write the test to java and use new test infrastructure, let's see it solves the issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842514#comment-17842514 ] Matthias J. Sax commented on KAFKA-16644: - Sorry. Wrong link. Fixed -> https://issues.apache.org/jira/browse/KAFKA-14748 > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16644: Description: We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a left-hand side record is deleted, the join now emits two tombstone records instead of one. The problem was not detected via unit test, because the tests use a `Map` instead of a `List` when verifying the result topic records ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] We should update all test cases to use `List` when reading from the output topic, and of course fix the introduced bug: The `SubscriptionSendProcessorSupplier` is sending two subscription records instead of just a single one: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] was: We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a left-hand side record is deleted, the join now emits two tombstone records instead of one. The problem was not detected via unit test, because the tests use a `Map` instead of a `List` when verifying the result topic records ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] We should update all test cases to use `List` when reading from the output topic, and of course fix the introduced bug: The `SubscriptionSendProcessorSupplier` is sending two subscription records instead of just a single one: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16475: add more tests to TopicImageNodeTest [kafka]
cmccabe merged PR #15735: URL: https://github.com/apache/kafka/pull/15735 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2087468237 @gharris1727 I do not have permission/write access to merge the PR. Will I get permission/right to merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2087467721 > Test failures appear unrelated, there's a targeted RemoteLogMetadataSerdeTest for this logic, and the storage tests appear to pass for me locally. Yes, in local, the tests are passing for me as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
philipnee commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585558712 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java: ## @@ -30,12 +29,7 @@ public abstract class CommitEvent extends CompletableApplicationEvent { */ private final Map offsets; -protected CommitEvent(final Type type, final Map offsets, final Timer timer) { -super(type, timer); -this.offsets = validate(offsets); -} - -protected CommitEvent(final Type type, final Map offsets, final long deadlineMs) { +public CommitEvent(final Type type, Map offsets, final long deadlineMs) { Review Comment: ditto: we probably don't need the final but it would be good to be consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
philipnee commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585179214 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1273,6 +1228,22 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); + +// close() can be called from inside one of the constructors. In that case, it's possible that neither +// the reaper nor the background event queue were constructed, so check them first to avoid NPE. +if (backgroundEventReaper != null && backgroundEventQueue != null) { +// Copy over the completable events to a separate list, then reap any incomplete +// events on that list. +LinkedList allEvents = new LinkedList<>(); Review Comment: Any specific reason for using linkedlist implementation? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CompletableEventReaperTest { + +private final LogContext logContext = new LogContext(); +private final Time time = new MockTime(0, 0, 0); Review Comment: the test should work without setting the current time to 0. so I think new MockTime(0) should be fine. ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -987,6 +987,7 @@ public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) { @ParameterizedTest @EnumSource(GroupProtocol.class) public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) { +Time time = new MockTime(1); Review Comment: can we remove this? I think the test works without it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
hachikuji commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1583487039 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -181,20 +181,12 @@ class KafkaRaftManager[T]( private val clientDriver = new KafkaRaftClientDriver[T](client, threadNamePrefix, fatalFaultHandler, logContext) def startup(): Unit = { -// Update the voter endpoints (if valid) with what's in RaftConfig -val voterAddresses: util.Map[Integer, AddressSpec] = controllerQuorumVotersFuture.get() -for (voterAddressEntry <- voterAddresses.entrySet.asScala) { - voterAddressEntry.getValue match { -case spec: InetAddressSpec => - netChannel.updateEndpoint(voterAddressEntry.getKey, spec) -case _: UnknownAddressSpec => - info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " + -s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}") -case invalid: AddressSpec => - warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " + -s"destination ID: ${voterAddressEntry.getKey}") - } -} +client.initialize( + controllerQuorumVotersFuture.get(), + config.controllerListenerNames.head, + new FileBasedStateStore(new File(dataDir, "quorum-state")), Review Comment: Maybe we can have a constant for the file name. Not sure if it is used elsewhere, but it would be nice to have a nice descriptive name. ## raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java: ## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.KRaftVersionRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.MockLog; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + +final class KRaftControlRecordStateMachineTest { +private static final RecordSerde STRING_SERDE = new StringSerde(); + +private static MockLog buildLog() { +return new MockLog(new TopicPartition("partition", 0), Uuid.randomUuid(), new LogContext()); +} + +private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log, Optional staticVoterSet) { +return new KRaftControlRecordStateMachine( +staticVoterSet, +log, +STRING_SERDE, +BufferSupplier.NO_CACHING, +1024, +new LogContext() +); +} + +@Test +void testEmptyParition() { Review Comment: nit: typo Par**t**ition ## raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java: ## @@ -206,8 +151,16 @@ private static Integer parseVoterId(String idString) { } } -public static Map parseVoterConnections(List voterEntries) { -Map voterMap = new HashMap<>(); +public static Map parseVoterConnections(List voterEntries) { +return parseVoterConnections(voterEntries, true); +} + +public static Set parseVoterIds(List voterEntries) { +return parseVoterConnections(voterEntries, false).keySet(); +} + +private static Map parseVoterConnections(List voterEntries, boolean routableOnly) { Review Comment: nit: I think `requireRoutableAddresses` might convey the expectation more clearly. ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -213,20 +213,21 @@ private void completeCurrentBatch() { * * @param valueCreator a function that uses the passed buffer to create the control *batch that will be appended. The memory records returned must
[jira] [Created] (KAFKA-16650) add integration test for Admin#abortTransaction
Chia-Ping Tsai created KAFKA-16650: -- Summary: add integration test for Admin#abortTransaction Key: KAFKA-16650 URL: https://issues.apache.org/jira/browse/KAFKA-16650 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai It seems there are only few unit tests. We should add IT includeing zk, kraft, and new group coordinator for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2087359722 Hey @kirktrue , thanks a lot for the PR, this is a big piece! I completed a pass of all the non-test files, left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2087355371 > > Here I have a comment, I could not put at the right location in the code: > > > > On line 1362, in commitSync() the consumer waits on the commitFuture with a timer. I think, it should not wait on a timer there since we already wait on a timer in the background thread. > > I agree. What about the timed wait in awaitPendingAsyncCommitsAndExecuteCommitCallbacks()? Agree we should not wait on the `commitFuture` with a timer because the deadline is contained in the event we submitted, and already enforced by the reaper, and not clear about what the proposed relationship with `awaitPendingAsyncCommitsAndExecuteCommitCallbacks` is?? I would expect we only need to call `ConsumerUtils.getResult(commitFuture);`, and that is consistent with how we get results for all other completable events now: - we create an event with a deadline - we call `applicationEventHandler.addAndGet(event)` For the commit case that flow has a different shape just because we use `applicationEventHandler.add(event)` [here](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L775), to cater for commit sync and async, but we should still apply the same approach and just call get without any time boundary I would say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]
cmccabe opened a new pull request, #15838: URL: https://github.com/apache/kafka/pull/15838 Do not acquire the DynamicBrokerConfig lock in DynamicBrokerConfig.removeReconfigurable. It's not necessary, because the list that these functions are modifying is a thread-safe CopyOnWriteArrayList. In DynamicBrokerConfig.reloadUpdatedFilesWithoutConfigChange, I changed the code to use a simple Java forEach rather than a Scala conversion, in order to feel more confident that concurrent modifications to the List would not have any bad effects here. (forEach is always safe on CopyOnWriteArrayList.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 UserScramCredentialsCommandTest rewritten in java [kafka]
chia7712 commented on code in PR #15832: URL: https://github.com/apache/kafka/pull/15832#discussion_r1585527416 ## core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.server.BaseRequestTest; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.ClusterTests; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.utils.Exit; +import org.apache.kafka.test.NoRetryException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.Console; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("dontUseSystemExit") +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults Review Comment: `@ClusterTestDefaults(clusterType = Type.ALL)` Let's test all types ## core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.server.BaseRequestTest; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.ClusterTests; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.utils.Exit; +import org.apache.kafka.test.NoRetryException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.Console; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("dontUseSystemExit") +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults +public class UserScramCredentialsCommandTest extends BaseRequestTest { +private static final String USER1 = "user1"; +private static final String USER2 = "user2"; + +@Override +public int brokerCount() { +return 1; +} + +static class ConfigCommandResult { +public final String stdout; +public final OptionalInt exitStatus; + +public ConfigCommandResult(String stdout) { +this(stdout, OptionalInt.empty()); +} + +public ConfigCommandResult(String stdout, OptionalInt exitStatus) { +this.stdout = stdout; +this.exitStatus = exitStatus; +} +} + +private ConfigCommandResult
[jira] [Updated] (KAFKA-16649) Remove lock from DynamicBrokerConfig.removeReconfigurable
[ https://issues.apache.org/jira/browse/KAFKA-16649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-16649: - Summary: Remove lock from DynamicBrokerConfig.removeReconfigurable (was: Fix potential deadlock in DynamicBrokerConfig) > Remove lock from DynamicBrokerConfig.removeReconfigurable > - > > Key: KAFKA-16649 > URL: https://issues.apache.org/jira/browse/KAFKA-16649 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16649) Remove lock from DynamicBrokerConfig.removeReconfigurable
[ https://issues.apache.org/jira/browse/KAFKA-16649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-16649: - Description: Do not acquire the DynamicBrokerConfig lock in DynamicBrokerConfig.removeReconfigurable. It's not necessary, because the list that these functions are modifying is a thread-safe CopyOnWriteArrayList. > Remove lock from DynamicBrokerConfig.removeReconfigurable > - > > Key: KAFKA-16649 > URL: https://issues.apache.org/jira/browse/KAFKA-16649 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Major > > Do not acquire the DynamicBrokerConfig lock in > DynamicBrokerConfig.removeReconfigurable. It's not necessary, because the > list that these functions are modifying is a thread-safe CopyOnWriteArrayList. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16649) Fix potential deadlock in DynamicBrokerConfig
Colin McCabe created KAFKA-16649: Summary: Fix potential deadlock in DynamicBrokerConfig Key: KAFKA-16649 URL: https://issues.apache.org/jira/browse/KAFKA-16649 Project: Kafka Issue Type: Bug Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842504#comment-17842504 ] A. Sophie Blee-Goldman edited comment on KAFKA-16644 at 4/30/24 8:57 PM: - [~mjsax] is KAFKA-14778 the correct issue that introduced a regression? That seems to link to an unrelated (and also unresolved) ticket was (Author: ableegoldman): [~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated (and also unresolved) ticket > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842504#comment-17842504 ] A. Sophie Blee-Goldman commented on KAFKA-16644: [~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated (and also unresolved) ticket > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15307: Update/errors for deprecated config [kafka]
Cerchie commented on PR #14448: URL: https://github.com/apache/kafka/pull/14448#issuecomment-2087251352 tagging @mjsax in for re-review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1585496531 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -319,7 +302,10 @@ public Builder setPerBrokerProperties(Map> perBroke } public ClusterConfig build() { -return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, +if (brokers <= 0 || controllers <= 0 || disksPerBroker <= 0) { Review Comment: We should enable to set `brokers` to 0 as it is valid to have a cluster with only quorum nodes. ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -202,8 +151,19 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu for (ClusterConfigProperty property : annot.serverProperties()) { serverProperties.put(property.key(), property.value()); } -configBuilder.setServerProperties(serverProperties); -type.invocationContexts(context.getRequiredTestMethod().getName(), configBuilder.build(), testInvocations); +ClusterConfig config = ClusterConfig.builder() +.setType(type) +.setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers()) +.setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers()) +.setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker()) +.setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : annot.autoStart() == AutoStart.YES) +.setName(annot.name().isEmpty() ? null : annot.name()) Review Comment: maybe `annot.name().trim().isEmpty()` is more suitable. ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -202,8 +151,19 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu for (ClusterConfigProperty property : annot.serverProperties()) { serverProperties.put(property.key(), property.value()); } -configBuilder.setServerProperties(serverProperties); -type.invocationContexts(context.getRequiredTestMethod().getName(), configBuilder.build(), testInvocations); +ClusterConfig config = ClusterConfig.builder() +.setType(type) +.setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers()) +.setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers()) +.setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker()) +.setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : annot.autoStart() == AutoStart.YES) +.setName(annot.name().isEmpty() ? null : annot.name()) +.setListenerName(annot.listener().isEmpty() ? null : annot.listener()) Review Comment: ditto ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -319,7 +302,10 @@ public Builder setPerBrokerProperties(Map> perBroke } public ClusterConfig build() { -return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, +if (brokers <= 0 || controllers <= 0 || disksPerBroker <= 0) { Review Comment: btw, those check should be moved to constructor of `ClusterConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
chia7712 commented on code in PR #15800: URL: https://github.com/apache/kafka/pull/15800#discussion_r1585492894 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -91,9 +91,6 @@ public Stream provideTestTemplateInvocationContex ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class); if (clusterTemplateAnnot != null) { processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add); -if (generatedContexts.isEmpty()) { -throw new IllegalStateException("ClusterConfig generator method should provide at least one config"); Review Comment: > try to add unit test for it. nice. It is ok to file a jira to log it and we can merge this PR first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
Cerchie commented on PR #14360: URL: https://github.com/apache/kafka/pull/14360#issuecomment-2087132901 tagging @mjsax here, made some edits in response to the last roung -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
Cerchie commented on code in PR #14360: URL: https://github.com/apache/kafka/pull/14360#discussion_r1585489940 ## docs/streams/developer-guide/config-streams.html: ## @@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per partition. 1000 - cache.max.bytes.buffering + statestore.cache.max.bytes +Medium +Maximum number of memory bytes to be used for record caches across all threads. Note that at the debug level you can use cache.size to monitor the actual size of the Kafka Streams cache. +10485760 + + cache.max.bytes.buffering (Deprecated. Use cache.max.bytes instead.) Review Comment: went through and did it manually, I think there was more than just this instance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
chia7712 commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1585489570 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int, // A map containing sessions which can be evicted by privileged sessions. private val evictableByPrivileged = new util.TreeMap[EvictableKey, FetchSession] + private val metricTag = Map("shard" -> s"$shardNum").asJava + // Set up metrics. - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size) - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions) - metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC) + metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, metricTag) + metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size, metricTag) Review Comment: not sure whether this is allowed. It seems to break the compatibility of metrics as it adds new tags. It means kafka users who monitoring this metrics need to update the query. ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, -new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: pardon me. what happens when users update `numIoThreads` dynamically? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
TaiJuWu commented on code in PR #15800: URL: https://github.com/apache/kafka/pull/15800#discussion_r1585486875 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -91,9 +91,6 @@ public Stream provideTestTemplateInvocationContex ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class); if (clusterTemplateAnnot != null) { processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add); -if (generatedContexts.isEmpty()) { -throw new IllegalStateException("ClusterConfig generator method should provide at least one config"); Review Comment: If we move the check to `processClusterTemplate`, `generatedContexts` also need to expose to `processClusterTemplate`, so I prefer keep this version and revert the change and try to add unit test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 8:25 PM: - Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your client code again, locally starting a broker in kraft mode, with the default config, only adding `group.coordinator.rebalance.protocols=consumer,classic`. 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest taking a look and share the broker logs to understand more about what's going on on your setup. If all looks good there maybe provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback. Hope it helps! was (Author: JIRAUSER300183): Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps! > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker |
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
TaiJuWu commented on code in PR #15800: URL: https://github.com/apache/kafka/pull/15800#discussion_r1585486875 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -91,9 +91,6 @@ public Stream provideTestTemplateInvocationContex ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class); if (clusterTemplateAnnot != null) { processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add); -if (generatedContexts.isEmpty()) { -throw new IllegalStateException("ClusterConfig generator method should provide at least one config"); Review Comment: If we move the check to `processClusterTemplate`, `generatedContexts` also need to expose to `processClusterTemplate`, so I prefer keep this version and revert the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
Cerchie commented on code in PR #14360: URL: https://github.com/apache/kafka/pull/14360#discussion_r1585481048 ## docs/streams/developer-guide/config-streams.html: ## @@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per partition. 1000 - cache.max.bytes.buffering + statestore.cache.max.bytes +Medium +Maximum number of memory bytes to be used for record caches across all threads. Note that at the debug level you can use cache.size to monitor the actual size of the Kafka Streams cache. Review Comment: found it in some confluent docs -- looks like it's been partially implemented. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 will remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]
chia7712 commented on PR #15055: URL: https://github.com/apache/kafka/pull/15055#issuecomment-2087068199 > Okay I messed up the git a little here (still have much to learn), the current trunk does exist on my [Kafka-16027](https://issues.apache.org/jira/browse/KAFKA-16027) branch but I had to redo it force push which led to auto-closing this branch. Any recommendation on what do to next? that is totally a bad news. Maybe you can take a look at this post (https://stackoverflow.com/questions/3973994/how-can-i-recover-from-an-erronous-git-push-f-origin-master) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
chia7712 merged PR #15808: URL: https://github.com/apache/kafka/pull/15808 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585466853 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java: ## @@ -16,9 +16,118 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import static java.util.Objects.requireNonNull; +/** + * {@code CompletableEvent} is an interface that is used by both {@link CompletableApplicationEvent} and + * {@link CompletableBackgroundEvent} for common processing and logic. A {@code CompletableEvent} is one that + * allows the caller to get the {@link #future() future} related to the event and the event's + * {@link #deadlineMs() expiration timestamp}. + * + * @param Return type for the event when completed + */ public interface CompletableEvent { +/** + * Returns the {@link CompletableFuture future} associated with this event. Any event will have some related + * logic that is executed on its behalf. The event can complete in one of the following ways: + * + * + * + * Success: when the logic for the event completes successfully, the data generated by that event + * (if applicable) is passed to {@link CompletableFuture#complete(Object)}. In the case where the generic + * bound type is specified as {@link Void}, {@code null} is provided. + * + * Error: when the the event logic generates an error, the error is passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * Timeout: when the time spent executing the event logic exceeds the {@link #deadlineMs() deadline}, an + * instance of {@link TimeoutException} should be created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * Cancelled: when an event remains incomplete when the consumer closes, the future will be + * {@link CompletableFuture#cancel(boolean) cancelled}. Attempts to {@link Future#get() get the result} Review Comment: The reaper actually calls `completeExceptionally` with a `CancellationException` instead of calling `CompletableFuture#cancel(boolean)`. Unless I'm missing a subtle semantic diff they should achieve the same, but still, adding a link to `cancel` here would not be accurate I would say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1585460093 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,332 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true") Review Comment: @frankvicky @FrankYang0529 Could you please address @lianetm comments? This class and `DeleteOffsetsConsumerGroupCommandIntegrationTest` need to test "LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false (`GroupCoordinatorAdapter`)" We can address that by `ClusterTemplate`. For example: ```java private static void generator(ClusterGenerator clusterGenerator) { Map serverProperties = new HashMap<>(); serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1585460093 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,332 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true") Review Comment: @frankvicky @FrankYang0529 Could you please address @lianetm comments? This class and `DeleteOffsetsConsumerGroupCommandIntegrationTest` need to test "LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false (`GroupCoordinatorAdapter`)" We can address that by `ClusterTemplate`. For example: ```java private static void generator(ClusterGenerator clusterGenerator) { Map serverProperties = new HashMap<>(); serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1585460093 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,332 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true") Review Comment: @frankvicky @FrankYang0529 Could you please address @lianetm comments? This class and `DeleteOffsetsConsumerGroupCommandIntegrationTest` need to test "LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false (`GroupCoordinatorAdapter`)" We can address that by `ClusterTemplate`. For example: ```java private static void generator(ClusterGenerator clusterGenerator) { Map serverProperties = new HashMap<>(); serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); // case 0: zk
[jira] [Updated] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext
[ https://issues.apache.org/jira/browse/KAFKA-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula updated KAFKA-9401: - Fix Version/s: 3.8.0 3.7.1 > High lock contention for kafka.server.FetchManager.newContext > - > > Key: KAFKA-9401 > URL: https://issues.apache.org/jira/browse/KAFKA-9401 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Lucas Bradstreet >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > kafka.server.FetchManager.newContext takes out what is essentially a global > fetch lock on kafka.server.FetchSessionCache, for updates to not only the > FetchSessionCache but the also update the fetch sessions stored with in it. > This causes a high amount of lock contention for fetches, as every fetch > request must go through this lock. > I have taken an async-profiler lock profile on a high throughput cluster, and > I see around 25s of waiting on this lock for a sixty second profile. > {noformat} > *— 25818577497 ns (20.84%), 5805 samples > [ 0] kafka.server.FetchSessionCache > [ 1] kafka.server.FetchManager.newContext > [ 2] kafka.server.KafkaApis.handleFetchRequest > [ 3] kafka.server.KafkaApis.handle > [ 4] kafka.server.KafkaRequestHandler.run > [ 5] java.lang.Thread.run > {noformat} > FetchSession.scala: > {code:java} > cache.synchronized { > cache.get(reqMetadata.sessionId) match { > case None => { > debug(s"Session error for ${reqMetadata.sessionId}: no such session > ID found.") > new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, > reqMetadata) > } > case Some(session) => session.synchronized { > if (session.epoch != reqMetadata.epoch) { > debug(s"Session error for ${reqMetadata.sessionId}: expected epoch > " + > s"${session.epoch}, but got ${reqMetadata.epoch} instead."); > new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, > reqMetadata) > } else { > val (added, updated, removed) = session.update(fetchData, toForget, > reqMetadata) > if (session.isEmpty) { > debug(s"Created a new sessionless FetchContext and closing > session id ${session.id}, " + > s"epoch ${session.epoch}: after removing > ${partitionsToLogString(removed)}, " + > s"there are no more partitions left.") > cache.remove(session) > new SessionlessFetchContext(fetchData) > } else { > cache.touch(session, time.milliseconds()) > session.epoch = JFetchMetadata.nextEpoch(session.epoch) > debug(s"Created a new incremental FetchContext for session id > ${session.id}, " + > s"epoch ${session.epoch}: added > ${partitionsToLogString(added)}, " + > s"updated ${partitionsToLogString(updated)}, " + > s"removed ${partitionsToLogString(removed)}") > new IncrementalFetchContext(time, reqMetadata, session) > } > } > } > } > } > {code} > Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect > FetchSessionCache eviction logic" > ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly > touched now, whereas previously the touch was being skipped. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585435927 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java: ## @@ -30,12 +29,7 @@ public abstract class CommitEvent extends CompletableApplicationEvent { */ private final Map offsets; -protected CommitEvent(final Type type, final Map offsets, final Timer timer) { -super(type, timer); -this.offsets = validate(offsets); -} - -protected CommitEvent(final Type type, final Map offsets, final long deadlineMs) { +public CommitEvent(final Type type, Map offsets, final long deadlineMs) { Review Comment: Is there a reason for loosing the final on the offsets map? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1585435466 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,332 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true") Review Comment: > Not at all, actually we want to test it, my point is just that we also should test old coordinator + LegacyConsumer, and the way to achieve that is running the LegacyConsumer + NEW_GROUP_COORDINATOR_ENABLE_CONFIG=false; Makes sense? You are right. I overlooked the `GroupCoordinatorAdapter` :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585390985 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,9 +310,18 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); + +LinkedList allEvents = new LinkedList<>(); +applicationEventQueue.drainTo(allEvents); +List> completableEvents = allEvents +.stream() +.filter(e -> e instanceof CompletableApplicationEvent) +.map(e -> (CompletableApplicationEvent) e) +.collect(Collectors.toList()); Review Comment: This logic is always needed whenever we `reapIncomplete`, and is currently repeated when we call it from the AsyncConsumer or here, so what about we move it into the `reapIncomplete`, make it receive a list of all events and internally filter the ones that are `CompletableEvent`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585390985 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,9 +310,18 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); + +LinkedList allEvents = new LinkedList<>(); +applicationEventQueue.drainTo(allEvents); +List> completableEvents = allEvents +.stream() +.filter(e -> e instanceof CompletableApplicationEvent) +.map(e -> (CompletableApplicationEvent) e) +.collect(Collectors.toList()); Review Comment: This logic is always needed whenever we `reapIncomplete`, and is currently repeated when we call it from the AsyncConsumer or here, so what about we move it into the `reapIncomplete`, make it receive a list of all events and internally filter the ones that are `CompletableEvent` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1585359721 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,332 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true") Review Comment: Hey @chia7712 , agree that when `NEW_GROUP_COORDINATOR_ENABLE_CONFIG` is true, kraft broker will use the new coordinator, with zk out of the pic, all good there. But > we still create LegacyConsumer to test the legacy coordinator by setting group.protocol=classic `LegacyConsumer` does not make the legacy coordinator kick in unless `isNewGroupCoordinatorEnabled` is false, because the classic protocol is supported by both coordinators. So when using the `LegacyConsumer`, it implies classic protocol (only one it supports), but the decision of which coordinator will serve the classic protocol is taken based on the broker config to enable the new coordinator or not.
[jira] [Assigned] (KAFKA-13447) Consumer should not reuse committed offset after topic recreation
[ https://issues.apache.org/jira/browse/KAFKA-13447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-13447: -- Assignee: Philip Nee > Consumer should not reuse committed offset after topic recreation > - > > Key: KAFKA-13447 > URL: https://issues.apache.org/jira/browse/KAFKA-13447 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Philip Nee >Priority: Major > Labels: consumer, needs-kip > > KAFKA-12257 fixes an issue in which the consumer is unable to make progress > after a topic has been recreated. The problem was that the client could not > distinguish between stale metadata with a lower leader epoch and a recreated > topic with a lower leader epoch. With TopicId support in KIP-516, the client > is able to tell when a topic has been recreated since the new topic will have > a different ID. > However, what the patch did not fix is the potential reuse of the current > offset position on the recreated topic. For example, say that the consumer is > at offset N when the topic gets recreated. Currently, the consumer will > continue fetching from offset N after detecting the recreation. The most > likely result of this is either an offset out of range error or a log > truncation error, but it is also possible for the offset position to remain > valid on the recreated topic (say for a low-volume topic where the offsets is > already low, or a case where the consumer was down for a while). > To fix this issue completely, we need to store the topicId along with the > committed offset in __consumer_offsets. This would allow the consumer to > detect when the offset is no longer relevant for the current topic. We also > need to decide how to raise this case to the user. If the user has enabled > automatic offset reset, we can probably use that. Otherwise, we might need a > new exception type to signal the user that the position needs to be reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]
sidyag opened a new pull request, #15837: URL: https://github.com/apache/kafka/pull/15837 Allowing WriteTxnMarkers API to run with AlterCluster permissions https://issues.apache.org/jira/browse/KAFKA-16513 https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16110) Document and publicize performance test results for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16110: --- Issue Type: Task (was: New Feature) > Document and publicize performance test results for AsyncKafkaConsumer > -- > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, performance-benchmark > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16110) Document and publicize performance test results for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16110: --- Summary: Document and publicize performance test results for AsyncKafkaConsumer (was: Implement consumer performance tests) > Document and publicize performance test results for AsyncKafkaConsumer > -- > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, performance-benchmark > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16290: -- Priority: Major (was: Critical) > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Bruno Cadonna >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 4.0.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
gharris1727 commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1585294415 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String connName, ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest( connName, connectorType, connectorClass, clientConfigs, clientType); List configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest); -if (configValues != null) { -for (ConfigValue validatedConfigValue : configValues) { -ConfigKey configKey = configKeys.get(validatedConfigValue.name()); -ConfigKeyInfo configKeyInfo = null; -if (configKey != null) { -if (configKey.group != null) { -groups.add(configKey.group); -} -configKeyInfo = convertConfigKey(configKey, prefix); -} -ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(), - validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages()); -if (!configValue.errorMessages().isEmpty()) { -errorCount++; +return prefixedConfigInfos(configDef.configKeys(), configValues, prefix); +} + +private static ConfigInfos prefixedConfigInfos(Map configKeys, List configValues, String prefix) { +int errorCount = 0; +Set groups = new LinkedHashSet<>(); +List configInfos = new ArrayList<>(); + +if (configValues == null) { Review Comment: I think this null check is only relevant when the value is coming from the overridePolicy.validate, in validateConverterConfig, I think the ConfigDef#validate call will always be non-null. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -392,6 +399,146 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +/** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker + * @param pluginInterface the interface for the plugin type + *(e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + *may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if no custom validation was performed (possibly because no custom plugin was defined in the connector + * config) + + * @param the plugin class to perform validation for + */ +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue pluginConfigValue, +Class pluginInterface, +Function configDefAccessor, +String pluginName, +String pluginProperty, +Map defaultProperties +) { +Objects.requireNonNull(connectorConfig); +Objects.requireNonNull(pluginInterface); +Objects.requireNonNull(configDefAccessor); +Objects.requireNonNull(pluginName); +Objects.requireNonNull(pluginProperty); + +String pluginClass = connectorConfig.get(pluginProperty); + +if (pluginClass == null +|| pluginConfigValue == null +||
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on PR #15836: URL: https://github.com/apache/kafka/pull/15836#issuecomment-2086478984 The following images show lock profiles collected using async-profiler before and after this change with numCacheShards = numIoThreads = 64 and demonstrates significant reduction in contention **Before** https://github.com/apache/kafka/assets/97168911/e2e1edad-7fe2-4260-908d-bc8d4395afca;> **After** https://github.com/apache/kafka/assets/97168911/8d926c4e-03e6-47e6-9367-cdd2ac89e3da;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16110) Implement consumer performance tests
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16110: -- Priority: Major (was: Blocker) > Implement consumer performance tests > > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, performance-benchmark > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1585308784 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,332 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true") Review Comment: ### server side If `NEW_GROUP_COORDINATOR_ENABLE_CONFIG` gets true, kraft broker will create `GroupCoordinator` https://github.com/apache/kafka/blob/1e8415160f96eb579ceaa3f89b3362f1deeccf6b/core/src/main/scala/kafka/server/BrokerServer.scala#L556 It enables to handle the requests used by `AsyncConsumer`. Also, `NEW_GROUP_COORDINATOR_ENABLE_CONFIG` do nothing to zk broker. ### client side `GROUP_PROTOCOL_CONFIG` is used to pick up the impl of `Consumer` https://github.com/apache/kafka/blob/1e8415160f96eb579ceaa3f89b3362f1deeccf6b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java#L62 >
[PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula opened a new pull request, #15836: URL: https://github.com/apache/kafka/pull/15836 KIP-227 introduced in-memory caching of FetchSessions. Brokers with a large number of Fetch requests suffer from contention on trying to acquire a lock on FetchSessionCache. This change aims to reduce lock contention for FetchSessionCache by sharding the cache into multiple segments, each responsible for an equal range of sessionIds. Assuming Fetch requests have a uniform distribution of sessionIds, the probability of contention on a segment is reduced by a factor of the number of segments. We ensure backwards compatibility by ensuring total number of cache entries remain the same as configured and sessionIds are randomly allocated. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM: - Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps! was (Author: JIRAUSER300183): Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) >
[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM: - Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: bq. for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps was (Author: JIRAUSER300183): Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: {quote}for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1{quote} Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. >
[jira] [Commented] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470 ] Lianet Magrans commented on KAFKA-16637: Hey [~chickenchickenlove], just to rule out the basics, did you make sure to produce messages to the same test-topic1? No other consumers subscribed to it that could be owning the partition? I tried your code again, 1 topic, 1 partition, 1 instance of your consumer app running with the poll duration of 1s, and was able to consume messages as expected. I only changed to StringDeserializers for simplicity: {quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());{quote} and produced a bunch of messages with: {quote}for x in {1..10}; do echo "Test message $x"; done | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1{quote} Then the consumer app with your code printed the 10 messages as expected. If after double checking that you're still facing issues, I would suggest to provide a ConsumerRebalanceListener to the call to subscribe, just to check/print the partitions assigned to your consumer on the onPartitionsAssigned callback, and also taking a look and share the broker logs to understand more about what's going on on your setup. Hope it helps > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16557) Fix OffsetFetchRequestState.toString()
[ https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16557: -- Priority: Minor (was: Major) > Fix OffsetFetchRequestState.toString() > -- > > Key: KAFKA-16557 > URL: https://issues.apache.org/jira/browse/KAFKA-16557 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The code incorrectly overrides the {{toString()}} method instead of > overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()
[ https://issues.apache.org/jira/browse/KAFKA-16558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16558: -- Priority: Minor (was: Major) > Implement HeartbeatRequestState.toStringBase() > -- > > Key: KAFKA-16558 > URL: https://issues.apache.org/jira/browse/KAFKA-16558 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The inner class {{HeartbeatRequestState}} does not override the > {{toStringBase()}} method. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]
jeffkbkim commented on code in PR #15835: URL: https://github.com/apache/kafka/pull/15835#discussion_r1585255667 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -461,26 +459,30 @@ public void testMetrics() throws Exception { @Test public void testRecordThreadIdleRatioTwoThreads() throws Exception { GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); +Time time = Time.SYSTEM; Review Comment: I used system time here because MockTime does not simulate on a per-thread basis -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]
jeffkbkim opened a new pull request, #15835: URL: https://github.com/apache/kafka/pull/15835 This PR fixes the thread idle ratio. We take a similar approach to the kafka request handler idle ratio: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L108-L117 Instead of calculating the actual ratio per thread, we record the time each thread stays idle while waiting for a new event, divided by the number of threads as an approximation. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext
[ https://issues.apache.org/jira/browse/KAFKA-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula reassigned KAFKA-9401: Assignee: Gaurav Narula > High lock contention for kafka.server.FetchManager.newContext > - > > Key: KAFKA-9401 > URL: https://issues.apache.org/jira/browse/KAFKA-9401 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Lucas Bradstreet >Assignee: Gaurav Narula >Priority: Major > > kafka.server.FetchManager.newContext takes out what is essentially a global > fetch lock on kafka.server.FetchSessionCache, for updates to not only the > FetchSessionCache but the also update the fetch sessions stored with in it. > This causes a high amount of lock contention for fetches, as every fetch > request must go through this lock. > I have taken an async-profiler lock profile on a high throughput cluster, and > I see around 25s of waiting on this lock for a sixty second profile. > {noformat} > *— 25818577497 ns (20.84%), 5805 samples > [ 0] kafka.server.FetchSessionCache > [ 1] kafka.server.FetchManager.newContext > [ 2] kafka.server.KafkaApis.handleFetchRequest > [ 3] kafka.server.KafkaApis.handle > [ 4] kafka.server.KafkaRequestHandler.run > [ 5] java.lang.Thread.run > {noformat} > FetchSession.scala: > {code:java} > cache.synchronized { > cache.get(reqMetadata.sessionId) match { > case None => { > debug(s"Session error for ${reqMetadata.sessionId}: no such session > ID found.") > new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, > reqMetadata) > } > case Some(session) => session.synchronized { > if (session.epoch != reqMetadata.epoch) { > debug(s"Session error for ${reqMetadata.sessionId}: expected epoch > " + > s"${session.epoch}, but got ${reqMetadata.epoch} instead."); > new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, > reqMetadata) > } else { > val (added, updated, removed) = session.update(fetchData, toForget, > reqMetadata) > if (session.isEmpty) { > debug(s"Created a new sessionless FetchContext and closing > session id ${session.id}, " + > s"epoch ${session.epoch}: after removing > ${partitionsToLogString(removed)}, " + > s"there are no more partitions left.") > cache.remove(session) > new SessionlessFetchContext(fetchData) > } else { > cache.touch(session, time.milliseconds()) > session.epoch = JFetchMetadata.nextEpoch(session.epoch) > debug(s"Created a new incremental FetchContext for session id > ${session.id}, " + > s"epoch ${session.epoch}: added > ${partitionsToLogString(added)}, " + > s"updated ${partitionsToLogString(updated)}, " + > s"removed ${partitionsToLogString(removed)}") > new IncrementalFetchContext(time, reqMetadata, session) > } > } > } > } > } > {code} > Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect > FetchSessionCache eviction logic" > ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly > touched now, whereas previously the touch was being skipped. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]
Alexander-Aghili commented on PR #15055: URL: https://github.com/apache/kafka/pull/15055#issuecomment-2086139400 Okay I messed up the git a little here (still have much to learn), the current trunk does exist on my Kafka-16027 branch but I had to redo it force push which led to auto-closing this branch. Any recommendation on what do to next? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]
Alexander-Aghili closed pull request #15055: KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership URL: https://github.com/apache/kafka/pull/15055 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16000: Migrate MembershipManagerImplTest away from ConsumerTestBuilder [kafka]
kirktrue closed pull request #14950: KAFKA-16000: Migrate MembershipManagerImplTest away from ConsumerTestBuilder URL: https://github.com/apache/kafka/pull/14950 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
[ https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16623: -- Description: When running system tests for the KafkaAsyncConsumer, we occasionally see this warning: {noformat} File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner self.run() File "/usr/lib/python3.7/threading.py", line 865, in run self._target(*self._args, **self._kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 38, in _protected_worker self._worker(idx, node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 304, in _worker handler.handle_partitions_revoked(event, node, self.logger) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 163, in handle_partitions_revoked (tp, node.account.hostname) AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) cannot be revoked from worker20 as it was not previously assigned to that consumer {noformat} In test_fencing_static_consumer, there are two sets of consumers that use group instance IDs: the initial set and the "conflict" set. It appears that one of the "conflicting" consumers hijacks the partition ownership from the coordinator's perspective when the initial consumer leaves the group. was: When running system tests for the KafkaAsyncConsumer, we occasionally see this warning: {noformat} File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner self.run() File "/usr/lib/python3.7/threading.py", line 865, in run self._target(*self._args, **self._kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 38, in _protected_worker self._worker(idx, node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 304, in _worker handler.handle_partitions_revoked(event, node, self.logger) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 163, in handle_partitions_revoked (tp, node.account.hostname) AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) cannot be revoked from worker20 as it was not previously assigned to that consumer {noformat} In > KafkaAsyncConsumer system tests warn about revoking partitions that weren't > previously assigned > --- > > Key: KAFKA-16623 > URL: https://issues.apache.org/jira/browse/KAFKA-16623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > When running system tests for the KafkaAsyncConsumer, we occasionally see > this warning: > {noformat} > File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner > self.run() > File "/usr/lib/python3.7/threading.py", line 865, in run > self._target(*self._args, **self._kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 38, in _protected_worker > self._worker(idx, node) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 304, in _worker > handler.handle_partitions_revoked(event, node, self.logger) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 163, in handle_partitions_revoked > (tp, node.account.hostname) > AssertionError: Topic partition TopicPartition(topic='test_topic', > partition=0) cannot be revoked from worker20 as it was not previously > assigned to that consumer > {noformat} > In test_fencing_static_consumer, there are two sets of consumers that use > group instance IDs: the initial set and the "conflict" set. It appears that > one of the "conflicting" consumers hijacks the partition ownership from the > coordinator's perspective when the initial consumer leaves the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
[ https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16623: -- Description: When running system tests for the KafkaAsyncConsumer, we occasionally see this warning: {noformat} File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner self.run() File "/usr/lib/python3.7/threading.py", line 865, in run self._target(*self._args, **self._kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 38, in _protected_worker self._worker(idx, node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 304, in _worker handler.handle_partitions_revoked(event, node, self.logger) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 163, in handle_partitions_revoked (tp, node.account.hostname) AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) cannot be revoked from worker20 as it was not previously assigned to that consumer {noformat} In was: When running system tests for the KafkaAsyncConsumer, we occasionally see this warning: {noformat} File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner self.run() File "/usr/lib/python3.7/threading.py", line 865, in run self._target(*self._args, **self._kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", line 38, in _protected_worker self._worker(idx, node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 304, in _worker handler.handle_partitions_revoked(event, node, self.logger) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", line 163, in handle_partitions_revoked (tp, node.account.hostname) AssertionError: Topic partition TopicPartition(topic='test_topic', partition=0) cannot be revoked from worker20 as it was not previously assigned to that consumer {noformat} It is unclear what is causing this. > KafkaAsyncConsumer system tests warn about revoking partitions that weren't > previously assigned > --- > > Key: KAFKA-16623 > URL: https://issues.apache.org/jira/browse/KAFKA-16623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > When running system tests for the KafkaAsyncConsumer, we occasionally see > this warning: > {noformat} > File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner > self.run() > File "/usr/lib/python3.7/threading.py", line 865, in run > self._target(*self._args, **self._kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 38, in _protected_worker > self._worker(idx, node) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 304, in _worker > handler.handle_partitions_revoked(event, node, self.logger) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 163, in handle_partitions_revoked > (tp, node.account.hostname) > AssertionError: Topic partition TopicPartition(topic='test_topic', > partition=0) cannot be revoked from worker20 as it was not previously > assigned to that consumer > {noformat} > In -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
[ https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842453#comment-17842453 ] Kirk True commented on KAFKA-16623: --- If I add the following code to {{test_fencing_static_consumer}} before stopping the consumer, the test runs: {code:python} # Make sure the conflicting consumers are all dead, but then go ahead and stop them all to ensure # that everything is cleanly stopped, otherwise we may get spurious errors assert len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes), "Conflicting consumers should all have received errors on startup and quit" conflict_consumer.stop_all() wait_until(lambda: len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes), timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for the conflict consumer to shutdown") {code} I'm still investigating, though, as I'm not sure if this is a "fix" or if it "masks" a real issue. > KafkaAsyncConsumer system tests warn about revoking partitions that weren't > previously assigned > --- > > Key: KAFKA-16623 > URL: https://issues.apache.org/jira/browse/KAFKA-16623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > When running system tests for the KafkaAsyncConsumer, we occasionally see > this warning: > {noformat} > File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner > self.run() > File "/usr/lib/python3.7/threading.py", line 865, in run > self._target(*self._args, **self._kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 38, in _protected_worker > self._worker(idx, node) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 304, in _worker > handler.handle_partitions_revoked(event, node, self.logger) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 163, in handle_partitions_revoked > (tp, node.account.hostname) > AssertionError: Topic partition TopicPartition(topic='test_topic', > partition=0) cannot be revoked from worker20 as it was not previously > assigned to that consumer > {noformat} > It is unclear what is causing this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on PR #15723: URL: https://github.com/apache/kafka/pull/15723#issuecomment-2086027819 Thanks everyone for the reviews and @lucasbru for the merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]
chiacyu commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1585188342 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -584,27 +584,6 @@ public static String formatBytes(long bytes) { } } -/** - * Create a string representation of an array joined by the given separator - * @param strs The array of items - * @param separator The separator - * @return The string representation. - */ -public static String join(T[] strs, String separator) { -return join(Arrays.asList(strs), separator); -} - -/** - * Create a string representation of a collection joined by the given separator - * @param collection The list of items - * @param separator The separator - * @return The string representation. - */ -public static String join(Collection collection, String separator) { -Objects.requireNonNull(collection); -return mkString(collection.stream(), "", "", separator); -} - /** * Create a string representation of a stream surrounded by `begin` and `end` and joined by `separator`. * Review Comment: Thanks for the reply. Suggestions applied. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on PR #14309: URL: https://github.com/apache/kafka/pull/14309#issuecomment-2085912341 @gharris1727 I've resolved the merge conflicts again; can you please take a look when you get a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16647: Remove setMetadataDirectory from BrokerNode/ControllerNode [kafka]
brandboat opened a new pull request, #15833: URL: https://github.com/apache/kafka/pull/15833 related to https://issues.apache.org/jira/browse/KAFKA-16647 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1585160959 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1307,13 +1307,14 @@ private CoordinatorResult consumerGr } } +// The subscription metadata is updated in two cases: +// 1) The member has updated its subscriptions; +// 2) The refresh deadline has been reached. Review Comment: I thought you wanted it above the if statement in the last comment haha, I think it was there originally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dajac commented on PR #15798: URL: https://github.com/apache/kafka/pull/15798#issuecomment-2085866313 @dongnuo123 Be aware of https://github.com/apache/kafka/pull/15785. The PR changes code that you have refactored or reused in this one. We will need to adapt when we merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1585158392 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1307,13 +1307,14 @@ private CoordinatorResult consumerGr } } +// The subscription metadata is updated in two cases: +// 1) The member has updated its subscriptions; +// 2) The refresh deadline has been reached. +Map subscribedTopicNamesMap = group.subscribedTopicNames(); Review Comment: we can't cause the list is also named subscribedTopicNames right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
dajac commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1585155736 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames( } } +/** + * Updates the subscription count. + * + * @param oldMember The old member. + * @param newMember The new member. + * + * @return Copy of the map of topics to the count of number of subscribers. + */ +public Map computeSubscribedTopicNames( +ConsumerGroupMember oldMember, +ConsumerGroupMember newMember +) { +Map subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); +maybeUpdateSubscribedTopicNames( +subscribedTopicNames, +oldMember, +newMember +); +return subscribedTopicNames; +} + +/** + * Compute the subscription type of the consumer group. + * + * If all the members are subscribed to the same set of topics, the type is homogeneous. + * Otherwise, it is heterogeneous. + * + * @param subscribedTopicNames A map of topic names to the count of members subscribed to each topic. + * @return {@link SubscriptionType#HOMOGENEOUS} if all members are subscribed to exactly the same topics; Review Comment: nit: Let's add an empty line before this one in order to match the style in the file. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1307,13 +1307,14 @@ private CoordinatorResult consumerGr } } +// The subscription metadata is updated in two cases: +// 1) The member has updated its subscriptions; +// 2) The refresh deadline has been reached. +Map subscribedTopicNamesMap = group.subscribedTopicNames(); Review Comment: nit: Should we use `subscribedTopicNames` too? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1307,13 +1307,14 @@ private CoordinatorResult consumerGr } } +// The subscription metadata is updated in two cases: +// 1) The member has updated its subscriptions; +// 2) The refresh deadline has been reached. Review Comment: nit: Could we move it to right before `subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames( } } +/** + * Updates the subscription count. + * + * @param oldMember The old member. + * @param newMember The new member. + * + * @return Copy of the map of topics to the count of number of subscribers. + */ +public Map computeSubscribedTopicNames( +ConsumerGroupMember oldMember, +ConsumerGroupMember newMember +) { +Map subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); +maybeUpdateSubscribedTopicNames( +subscribedTopicNames, +oldMember, +newMember +); +return subscribedTopicNames; +} + +/** + * Compute the subscription type of the consumer group. + * + * If all the members are subscribed to the same set of topics, the type is homogeneous. + * Otherwise, it is heterogeneous. Review Comment: nit: We could remove this as it is already in the `@return`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
philipnee commented on PR #15723: URL: https://github.com/apache/kafka/pull/15723#issuecomment-2085841809 Hey sorry for the delay, the changes look good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585152310 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection topics, Optional T processBackgroundEvents(EventProcessor eventProcessor, + T processBackgroundEvents(EventProcessor eventProcessor, Review Comment: Not introduced by this PR, but reviewing this processing I don't quite see the value in all [these lines](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1910-L1915) , that are even repeated further down, just for a log, when in practice this are both the happy path that will have [this](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1506) log from the unsubscribe. A one liner with `return ConsumerUtils.getResult(future);` would achieve the same and make the func much simpler. (even if we end up using this from a func other than the unsubscribe, seems an overkill to have all this code for something we don't need now, or know if we we'll need some day) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dajac merged PR #15818: URL: https://github.com/apache/kafka/pull/15818 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585035057 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection topics, Optional processor) { Review Comment: what about renaming this to be explicit about what we process here? It gets confusing given that at this consumer level we're dealing with app events and background events. `processBackgroundEvents` feels pretty clear, and I know there is already another one called liked that, but the other one is more about `awaitFutureProcessingBackgroundEvents` , because it actually blocks for a time, only used from the unsubscribe, so maybe rename here and there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585091606 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection topics, Optional T processBackgroundEvents(EventProcessor eventProcessor, + T processBackgroundEvents(EventProcessor eventProcessor, Review Comment: Also, regarding: > as part of this rebalancing work, the {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked it does not need to if the consumer unsubscribing does not own any partition, so just for accuracy in the example I would suggest to extend it with "...needs to be invoked for the partitions the consumer owns" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585083395 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection topics, Optional T processBackgroundEvents(EventProcessor eventProcessor, + T processBackgroundEvents(EventProcessor eventProcessor, Review Comment: Regarding the func doc, typo and clarification: > When the application thread sees ..., it is processed, and **then a ...is then** enqueued by the application thread **on the background event queue** The app thread enqueues the event in an **application event queue** (that the background thread consumes), right? In the doc we ended up mentioning the background and app thread both adding to the background event queue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585035057 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection topics, Optional processor) { Review Comment: what about renaming this to be explicit about what we process here? It gets confusing given that at this consumer level we're dealing with app events and background events -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset
[ https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842423#comment-17842423 ] Matthias J. Sax commented on KAFKA-16382: - Not yet from our side... Working on other things atm. Not sure when we will be able to pick it up, or if anybody from the community wants to take it. > Kafka Streams drop NULL values after reset > -- > > Key: KAFKA-16382 > URL: https://issues.apache.org/jira/browse/KAFKA-16382 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Stanislav Spiridonov >Priority: Major > > Kafka Streams (KTable) drops null values after full reset. > See > [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] > for sample topology > Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) > # Start example - 1st round > # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" > # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" > # Stop application > # Run kafka-streams-application-reset > {code:java} > call bin/windows/kafka-streams-application-reset --application-id > nullproblem-example^ > --input-topics "NULL-IN,NULL-IN-AUX"^ > --bootstrap-server "localhost:9092" > {code} > # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app > running yet) > # Start example - 2nd round > # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, > A1:ab" > # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}" > The issue is NOT reproduced if application just restarted (skip step 5). > The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585004533 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, Review Comment: responsible for events that "are being processed" right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585003046 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, + * making sure to reap them if they complete normally or pass their deadline. This is done so that we enforce an upper + * bound on the amount of time the event logic will execute. + */ +public class CompletableEventReaper> { + +private final Logger log; + +/** + * List of tracked events that we are candidates to expire or cancel when reviewed. + */ +private final List tracked; + +public CompletableEventReaper(LogContext logContext) { +this.log = logContext.logger(CompletableEventReaper.class); +this.tracked = new ArrayList<>(); +} + +/** + * Adds a new {@link CompletableEvent event} to track for later completion/expiration. + * + * @param event Event to track + */ +public void add(T event) { +tracked.add(Objects.requireNonNull(event, "Event to track must be non-null")); +} + +/** + * This method "completes" any {@link CompletableEvent}s that have either expired or completed normally. So this + * is a two-step process: + * + * + * + * For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an + * instance of {@link TimeoutException} is created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * For each tracked event of which its {@link CompletableEvent#future() future} is already in the + * {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events. + * + * + * + * + * + * This method should be called at regular intervals, based upon the needs of the resource that owns the reaper. + * + * @param currentTimeMs Current time with which to compare against the + * {@link CompletableEvent#deadlineMs() expiration time} + */ +public void reapExpiredAndCompleted(long currentTimeMs) { +log.trace("Reaping expired events"); + +Consumer> timeoutEvent = e -> { +TimeoutException error = new TimeoutException(String.format("%s could not be completed within its timeout", e.getClass().getSimpleName())); +long pastDueMs = currentTimeMs - e.deadlineMs(); +log.debug("Completing event {} exceptionally since it expired {} ms ago", e, pastDueMs); +CompletableFuture f = e.future(); +f.completeExceptionally(error); +}; + +// First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. +tracked.stream() +.filter(e -> !e.future().isDone()) +.filter(e -> currentTimeMs > e.deadlineMs()) +.forEach(timeoutEvent); +// Second, remove any events that are already complete, just to make sure we don't hold references. This will +// include any events that finished successfully as well as any events we just completed exceptionally above. +tracked.removeIf(e -> e.future().isDone()); Review Comment: Couldn't we make that the same add operation removes the event whenComplete? Seems tighter that the same operation that adds the event ensures
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
lianetm commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1585003046 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, + * making sure to reap them if they complete normally or pass their deadline. This is done so that we enforce an upper + * bound on the amount of time the event logic will execute. + */ +public class CompletableEventReaper> { + +private final Logger log; + +/** + * List of tracked events that we are candidates to expire or cancel when reviewed. + */ +private final List tracked; + +public CompletableEventReaper(LogContext logContext) { +this.log = logContext.logger(CompletableEventReaper.class); +this.tracked = new ArrayList<>(); +} + +/** + * Adds a new {@link CompletableEvent event} to track for later completion/expiration. + * + * @param event Event to track + */ +public void add(T event) { +tracked.add(Objects.requireNonNull(event, "Event to track must be non-null")); +} + +/** + * This method "completes" any {@link CompletableEvent}s that have either expired or completed normally. So this + * is a two-step process: + * + * + * + * For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an + * instance of {@link TimeoutException} is created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * For each tracked event of which its {@link CompletableEvent#future() future} is already in the + * {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events. + * + * + * + * + * + * This method should be called at regular intervals, based upon the needs of the resource that owns the reaper. + * + * @param currentTimeMs Current time with which to compare against the + * {@link CompletableEvent#deadlineMs() expiration time} + */ +public void reapExpiredAndCompleted(long currentTimeMs) { +log.trace("Reaping expired events"); + +Consumer> timeoutEvent = e -> { +TimeoutException error = new TimeoutException(String.format("%s could not be completed within its timeout", e.getClass().getSimpleName())); +long pastDueMs = currentTimeMs - e.deadlineMs(); +log.debug("Completing event {} exceptionally since it expired {} ms ago", e, pastDueMs); +CompletableFuture f = e.future(); +f.completeExceptionally(error); +}; + +// First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. +tracked.stream() +.filter(e -> !e.future().isDone()) +.filter(e -> currentTimeMs > e.deadlineMs()) +.forEach(timeoutEvent); +// Second, remove any events that are already complete, just to make sure we don't hold references. This will +// include any events that finished successfully as well as any events we just completed exceptionally above. +tracked.removeIf(e -> e.future().isDone()); Review Comment: Couldn't we make that the same add operation removes the event whenComplete? Seems tighter that the same operation that adds the event ensures