[jira] [Assigned] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager
[ https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Owen C.H. Leung reassigned KAFKA-15038: --- Assignee: Owen C.H. Leung > Use topic id/name mapping from the Metadata cache in the RemoteLogManager > - > > Key: KAFKA-15038 > URL: https://issues.apache.org/jira/browse/KAFKA-15038 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Owen C.H. Leung >Priority: Minor > > Currently, the {{RemoteLogManager}} maintains its own cache of topic name to > topic id > [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138] > using the information provided during leadership changes, and removing the > mapping upon receiving the notification of partition stopped. > It should be possible to re-use the mapping in a broker's metadata cache, > removing the need for the RLM to build and update a local cache thereby > duplicating the information in the metadata cache. It also allows to preserve > a single source of authority regarding the association between topic names > and ids. > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager
[ https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058 ] Owen C.H. Leung commented on KAFKA-15038: - [~divijvaidya] I'd like to pick this up. I've done a bit diving and would like to clarify if my understanding is correct : So essentially, in this ticket we want to remove the use of `ConcurrentMap topicPartitionIds`, and leverage the cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use metadata manager in the `Map remoteLogMetadataManagerProps()` ) to serve as the cache, which will be the single source of authority ? Let me know. Thanks > Use topic id/name mapping from the Metadata cache in the RemoteLogManager > - > > Key: KAFKA-15038 > URL: https://issues.apache.org/jira/browse/KAFKA-15038 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Owen C.H. Leung >Priority: Minor > > Currently, the {{RemoteLogManager}} maintains its own cache of topic name to > topic id > [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138] > using the information provided during leadership changes, and removing the > mapping upon receiving the notification of partition stopped. > It should be possible to re-use the mapping in a broker's metadata cache, > removing the need for the RLM to build and update a local cache thereby > duplicating the information in the metadata cache. It also allows to preserve > a single source of authority regarding the association between topic names > and ids. > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager
[ https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058 ] Owen C.H. Leung edited comment on KAFKA-15038 at 7/31/23 7:13 AM: -- [~divijvaidya] I'd like to pick this up. I've done a bit diving and would like to clarify if my understanding is correct : So essentially, in this ticket we want to remove the use of ```ConcurrentMap topicPartitionIds```, and leverage the cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use metadata manager in the `Map remoteLogMetadataManagerProps()` ) to serve as the cache, which will be the single source of authority ? Let me know. Thanks was (Author: JIRAUSER300460): [~divijvaidya] I'd like to pick this up. I've done a bit diving and would like to clarify if my understanding is correct : So essentially, in this ticket we want to remove the use of `ConcurrentMap topicPartitionIds`, and leverage the cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use metadata manager in the `Map remoteLogMetadataManagerProps()` ) to serve as the cache, which will be the single source of authority ? Let me know. Thanks > Use topic id/name mapping from the Metadata cache in the RemoteLogManager > - > > Key: KAFKA-15038 > URL: https://issues.apache.org/jira/browse/KAFKA-15038 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Owen C.H. Leung >Priority: Minor > > Currently, the {{RemoteLogManager}} maintains its own cache of topic name to > topic id > [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138] > using the information provided during leadership changes, and removing the > mapping upon receiving the notification of partition stopped. > It should be possible to re-use the mapping in a broker's metadata cache, > removing the need for the RLM to build and update a local cache thereby > duplicating the information in the metadata cache. It also allows to preserve > a single source of authority regarding the association between topic names > and ids. > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager
[ https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058 ] Owen C.H. Leung edited comment on KAFKA-15038 at 7/31/23 7:14 AM: -- [~divijvaidya] I'd like to pick this up. I've done a bit diving and would like to clarify if my understanding is correct : So essentially, in this ticket we want to remove the use of ??ConcurrentMap topicPartitionIds??, and leverage the cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use metadata manager in the `Map remoteLogMetadataManagerProps()` ) to serve as the cache, which will be the single source of authority ? Let me know. Thanks was (Author: JIRAUSER300460): [~divijvaidya] I'd like to pick this up. I've done a bit diving and would like to clarify if my understanding is correct : So essentially, in this ticket we want to remove the use of ```ConcurrentMap topicPartitionIds```, and leverage the cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use metadata manager in the `Map remoteLogMetadataManagerProps()` ) to serve as the cache, which will be the single source of authority ? Let me know. Thanks > Use topic id/name mapping from the Metadata cache in the RemoteLogManager > - > > Key: KAFKA-15038 > URL: https://issues.apache.org/jira/browse/KAFKA-15038 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Owen C.H. Leung >Priority: Minor > > Currently, the {{RemoteLogManager}} maintains its own cache of topic name to > topic id > [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138] > using the information provided during leadership changes, and removing the > mapping upon receiving the notification of partition stopped. > It should be possible to re-use the mapping in a broker's metadata cache, > removing the need for the RLM to build and update a local cache thereby > duplicating the information in the metadata cache. It also allows to preserve > a single source of authority regarding the association between topic names > and ids. > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager
[ https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749058#comment-17749058 ] Owen C.H. Leung edited comment on KAFKA-15038 at 7/31/23 7:15 AM: -- [~divijvaidya] I'd like to pick this up. I've done a bit diving and would like to clarify if my understanding is correct : So essentially, in this ticket we want to remove the use of {*}ConcurrentMap topicPartitionIds{*}, and leverage the cache available in *RemoteLogManagerConfig rlmConfig* (to be more specific, use metadata manager in the *Map remoteLogMetadataManagerProps()* ) to serve as the cache, which will be the single source of authority ? Let me know. Thanks was (Author: JIRAUSER300460): [~divijvaidya] I'd like to pick this up. I've done a bit diving and would like to clarify if my understanding is correct : So essentially, in this ticket we want to remove the use of ??ConcurrentMap topicPartitionIds??, and leverage the cache available in `RemoteLogManagerConfig rlmConfig` (to be more specific, use metadata manager in the `Map remoteLogMetadataManagerProps()` ) to serve as the cache, which will be the single source of authority ? Let me know. Thanks > Use topic id/name mapping from the Metadata cache in the RemoteLogManager > - > > Key: KAFKA-15038 > URL: https://issues.apache.org/jira/browse/KAFKA-15038 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Owen C.H. Leung >Priority: Minor > > Currently, the {{RemoteLogManager}} maintains its own cache of topic name to > topic id > [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138] > using the information provided during leadership changes, and removing the > mapping upon receiving the notification of partition stopped. > It should be possible to re-use the mapping in a broker's metadata cache, > removing the need for the RLM to build and update a local cache thereby > duplicating the information in the metadata cache. It also allows to preserve > a single source of authority regarding the association between topic names > and ids. > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs
showuon commented on code in PR #14114: URL: https://github.com/apache/kafka/pull/14114#discussion_r1278885944 ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -713,6 +714,99 @@ class DynamicBrokerConfigTest { config.updateCurrentConfig(new KafkaConfig(props)) assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp)) } + + @Test + def testDynamicLogLocalRetentionMsConfig(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) +props.put(KafkaConfig.LogRetentionTimeMillisProp, "259200") +val config = KafkaConfig(props) +val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) +config.dynamicConfig.initialize(None) +config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) + +val newProps = new Properties() +newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "216000") +// update default config +config.dynamicConfig.validate(newProps, perBrokerConfig = false) +config.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(216000L, config.logLocalRetentionMs) + +// update per broker config +config.dynamicConfig.validate(newProps, perBrokerConfig = true) +newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "215000") +config.dynamicConfig.updateBrokerConfig(0, newProps) +assertEquals(215000L, config.logLocalRetentionMs) + } + + @Test + def testDynamicLogLocalRetentionSizeConfig(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) +props.put(KafkaConfig.LogRetentionBytesProp, "4294967296") +val config = KafkaConfig(props) +val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) +config.dynamicConfig.initialize(None) +config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) + +val newProps = new Properties() +newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "4294967295") +// update default config +config.dynamicConfig.validate(newProps, perBrokerConfig = false) +config.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(4294967295L, config.logLocalRetentionBytes) + +// update per broker config +config.dynamicConfig.validate(newProps, perBrokerConfig = true) +newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "4294967294") +config.dynamicConfig.updateBrokerConfig(0, newProps) +assertEquals(4294967294L, config.logLocalRetentionBytes) + } + + @Test + def testDynamicLogLocalRetentionSkipsOnInvalidConfig(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) +props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000") +props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024") +val config = KafkaConfig(props) +config.dynamicConfig.initialize(None) + +// Check for invalid localRetentionMs < -2 +verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3")) +// Check for invalid localRetentionBytes < -2 +verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP -> "-3")) + } + + @Test + def testDynamicLogLocalRetentionThrowsOnIncorrectConfig(): Unit = { +// Check for incorrect case of logLocalRetentionMs > retentionMs +verifyIncorrectLogLocalRetentionProps(2000L, 2, 100, 1000L) +// Check for incorrect case of logLocalRetentionBytes > retentionBytes +verifyIncorrectLogLocalRetentionProps(500L, 200, 100, 1000L) +// Check for incorrect case of logLocalRetentionMs (-1 viz unlimited) > retentionMs, +verifyIncorrectLogLocalRetentionProps(-1, 200, 100, 1000L) +// Check for incorrect case of logLocalRetentionBytes(-1 viz unlimited) > retentionBytes +verifyIncorrectLogLocalRetentionProps(2000L, -1, 100, 1000L) + } + + def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long, +logLocalRetentionBytes: Long, +retentionBytes: Long, +retentionMs: Long): Unit = { Review Comment: nit: It's hard to review this validation method with current parameter order. Could you put the same type of config in pair? ex: ``` def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long, retentionMs: Long, logLocalRetentionBytes: Long, retentionBytes: Long) ``` ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1784,4 +1788,22 @@ class KafkaConfigTest { props.put(KafkaConfig.ConsumerGroupHeartbeatIntervalMsProp, "25") assertThrows(classOf[IllegalArgumentException], ()
[GitHub] [kafka] divijvaidya commented on pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
divijvaidya commented on PR #14078: URL: https://github.com/apache/kafka/pull/14078#issuecomment-1657889077 Hi Fei CI tests are re-triggered on new commits or when a committer restarts them. Those are the only two options right now. Regarding other flaky tests, if you run `./gradlew unitTest` and `./gradlew integrationTest` locally, they should pass. You can mention in the description that these tests pass locally for you and after that the person who will merge can determine if the flaky tests in CI are related to your change or not prior to merging. You can also look at JIRA tickets and usually the existing flaky tests have an open JIRA ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'
divijvaidya commented on PR #14078: URL: https://github.com/apache/kafka/pull/14078#issuecomment-1657889903 I have just re-triggered CI tests for this PR at https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14078/2/pipeline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
AndrewJSchofield commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1278963024 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -518,7 +529,7 @@ else if (!future.isRetriable()) return false; } -timer.sleep(rebalanceConfig.retryBackoffMs); +timer.sleep(retryBackoff.backoff(attempts++)); Review Comment: Yes, true. Good point. I'll revert the code for this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
kamalcph commented on code in PR #14128: URL: https://github.com/apache/kafka/pull/14128#discussion_r1278963394 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException } } +List enrichedLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) { +List enrichedLogSegments = new ArrayList<>(); +List segments = JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq()); +if (!segments.isEmpty()) { +int idx = 1; +for (; idx < segments.size(); idx++) { +LogSegment previous = segments.get(idx - 1); +LogSegment current = segments.get(idx); +enrichedLogSegments.add(new EnrichedLogSegment(previous, current.baseOffset())); +} +// LogSegment#readNextOffset() is an expensive call, so we only call it when necessary. +int lastIdx = idx - 1; +if (segments.get(lastIdx).baseOffset() < lastStableOffset) { +LogSegment last = segments.get(lastIdx); +enrichedLogSegments.add(new EnrichedLogSegment(last, last.readNextOffset())); +} Review Comment: For a given [LogSegment](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala), we know about start-offset (base-offset) but not the end offset. `readNextOffset` denotes `end-offset-of-that-segment` + 1. To exclude the active segments, we are using `log.nonActiveLogSegmentsFrom`. With this, we cannot use the active-segment-base-offset as the operations are not done atomically. (the active segment might gets rotated in the mean time). If we want to avoid `LogSegment#nextReadOffset` operation altogether, then we can list all the segments including the active and discard/filter the final entry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
kamalcph commented on code in PR #14128: URL: https://github.com/apache/kafka/pull/14128#discussion_r1278963394 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException } } +List enrichedLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) { +List enrichedLogSegments = new ArrayList<>(); +List segments = JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq()); +if (!segments.isEmpty()) { +int idx = 1; +for (; idx < segments.size(); idx++) { +LogSegment previous = segments.get(idx - 1); +LogSegment current = segments.get(idx); +enrichedLogSegments.add(new EnrichedLogSegment(previous, current.baseOffset())); +} +// LogSegment#readNextOffset() is an expensive call, so we only call it when necessary. +int lastIdx = idx - 1; +if (segments.get(lastIdx).baseOffset() < lastStableOffset) { +LogSegment last = segments.get(lastIdx); +enrichedLogSegments.add(new EnrichedLogSegment(last, last.readNextOffset())); +} Review Comment: For a given [LogSegment](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala), we know about start-offset (base-offset) but not the end offset. `readNextOffset` denotes `end-offset-of-that-segment` + 1. To exclude the active segment, we are using `log.nonActiveLogSegmentsFrom`. With this, we cannot use the active-segment-base-offset as the operations are not done atomically. (the active segment might gets rotated in the mean time). If we want to avoid `LogSegment#nextReadOffset` operation altogether, then we can list all the segments including the active and discard/filter the final entry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14110: MINOR: Add test for describe topic with ID
divijvaidya commented on code in PR #14110: URL: https://github.com/apache/kafka/pull/14110#discussion_r1278956423 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception { } } +@Test +public void testDescribeTopicsByIds() { +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), +AdminClientConfig.RETRIES_CONFIG, "2")) { Review Comment: Just use `try (AdminClientUnitTestEnv env = mockClientEnv()) {` maybe? Setting cluster to 4 nodes and 2 retries does't sound relevant to this test ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception { } } +@Test +public void testDescribeTopicsByIds() { +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), +AdminClientConfig.RETRIES_CONFIG, "2")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Valid ID Review Comment: we have added a case for valid Id, another for unrepresentable id and perhaps we have have one more for id which does not exist in the broker? Can you please add the third case? ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception { } } +@Test +public void testDescribeTopicsByIds() { +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), +AdminClientConfig.RETRIES_CONFIG, "2")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Valid ID +Uuid topicId = Uuid.randomUuid(); +String topicName = "test-topic"; +Node leader = env.cluster().nodes().get(0); +MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata( +Errors.NONE, +new TopicPartition(topicName, 0), +Optional.of(leader.id()), +Optional.of(10), +singletonList(leader.id()), +singletonList(leader.id()), +singletonList(leader.id())); +env.kafkaClient().prepareResponse(RequestTestUtils +.metadataResponse( +env.cluster().nodes(), +env.cluster().clusterResource().clusterId(), +env.cluster().controller().id(), +singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false, +singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED; +TopicCollection.TopicIdCollection topicIds = TopicCollection.ofTopicIds( +singletonList(topicId)); +try { +DescribeTopicsResult result = env.adminClient().describeTopics( +TopicCollection.ofTopicIds(singletonList(topicId))); +Map allTopicIds = result.allTopicIds().get(); +assertEquals(topicName, allTopicIds.get(topicId).name()); +} catch (Exception e) { +fail("describe with valid topicId should not fail", e); +} + +// Invalid ID +try { +DescribeTopicsResult result = env.adminClient().describeTopics( + TopicCollection.ofTopicIds(singletonList(Uuid.ZERO_UUID))); +result.allTopicIds().get(); Review Comment: you can also choose to use `TestUtils.assertFutureError` ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2354,6 +2354,55 @@ public void testDeleteRecords() throws Exception { } } +@Test +public void testDescribeTopicsByIds() { +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), +AdminClientConfig.RETRIES_CONFIG, "2")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Valid ID +Uuid topicId = Uuid.randomUuid(); +String topicName = "test-topic"; +Node leader = env.cluster().nodes().get(0); +MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata( +Errors.NONE, +new TopicPartition(topicName, 0), +Optional.of(leader.id()), +Optional.of(10), +singletonList(leader.id()), +singletonList(leader.id()), +singlet
[GitHub] [kafka] showuon commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
showuon commented on code in PR #14128: URL: https://github.com/apache/kafka/pull/14128#discussion_r1278970321 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException } } +List enrichedLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) { +List enrichedLogSegments = new ArrayList<>(); +List segments = JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq()); +if (!segments.isEmpty()) { +int idx = 1; +for (; idx < segments.size(); idx++) { +LogSegment previous = segments.get(idx - 1); +LogSegment current = segments.get(idx); +enrichedLogSegments.add(new EnrichedLogSegment(previous, current.baseOffset())); +} +// LogSegment#readNextOffset() is an expensive call, so we only call it when necessary. +int lastIdx = idx - 1; +if (segments.get(lastIdx).baseOffset() < lastStableOffset) { +LogSegment last = segments.get(lastIdx); +enrichedLogSegments.add(new EnrichedLogSegment(last, last.readNextOffset())); +} Review Comment: > If we want to avoid LogSegment#nextReadOffset operation altogether, then we can list all the segments including the active and discard/filter the final entry. Yes, that's my thought, given the `LogSegment#nextReadOffset` is expensive. Any drawbacks for this solution? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest
yashmayya commented on PR #14102: URL: https://github.com/apache/kafka/pull/14102#issuecomment-1657931548 Hm I just noticed that the `DistributedHerderTest` is failing on the CI run (only for JDK 11) with: ``` org.mockito.exceptions.base.MockitoException: Unable to create mock instance of type 'DistributedHerder' at app//org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.setUp(DistributedHerderTest.java:300) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) at app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at app//org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33) at app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at app//org.mockito.internal.runners.DefaultInternalRunner$1$1.evaluate(DefaultInternalRunner.java:55) at app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) at app//org.mockito.internal.runners.DefaultInternalRunner$1.run(DefaultInternalRunner.java:100) at app//org.mockito.internal.runners.DefaultInternalRunner.run(DefaultInternalRunner.java:107) at app//org.mockito.internal.runners.StrictRunner.run(StrictRunner.java:41) at app//org.mockito.junit.MockitoJUnitRunner.run(MockitoJUnitRunner.java:163) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClass
[GitHub] [kafka] kamalcph commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs
kamalcph commented on code in PR #14114: URL: https://github.com/apache/kafka/pull/14114#discussion_r1278982175 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -424,6 +387,18 @@ public int initFileSize() { return 0; } +public boolean remoteStorageEnable() { +return remoteLogConfig.remoteStorageEnable; +} + +public long localRetentionMs() { +return remoteLogConfig.localRetentionMs == LogConfig.DEFAULT_LOCAL_RETENTION_MS ? retentionMs : remoteLogConfig.localRetentionMs; +} + +public long localRetentionBytes() { +return remoteLogConfig.localRetentionBytes == LogConfig.DEFAULT_LOCAL_RETENTION_BYTES ? retentionSize : remoteLogConfig.localRetentionBytes; Review Comment: yes, correct. The caller should get the real local log retention values when set to -2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8128) Dynamic delegation token change possibility for consumer/producer
[ https://issues.apache.org/jira/browse/KAFKA-8128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749087#comment-17749087 ] Gabor Somogyi commented on KAFKA-8128: -- It took some time to remember what was the original issue since the jira reporter not written it down clearly😅 Now I remember that JAAS context is definitely not re-loaded which is the root cause. As I remember there is a manual JAAS context reload possibility in the JVM but it didn't have effect on the producer/consumer. The last statement may or may not be valid since it was 4 years ago... All in all it would be good to have an explicit API to change the token which would solve many issues for sure. > Dynamic delegation token change possibility for consumer/producer > - > > Key: KAFKA-8128 > URL: https://issues.apache.org/jira/browse/KAFKA-8128 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.2.0 >Reporter: Gabor Somogyi >Assignee: Viktor Somogyi-Vass >Priority: Major > > Re-authentication feature on broker side is under implementation which will > enforce consumer/producer instances to re-authenticate time to time. It would > be good to set the latest delegation token dynamically and not re-creating > consumer/producer instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15195: --- Priority: Major (was: Blocker) > Regenerate segment-aligned producer snapshots when upgrading to a Kafka > version supporting Tiered Storage > - > > Key: KAFKA-15195 > URL: https://issues.apache.org/jira/browse/KAFKA-15195 > Project: Kafka > Issue Type: Sub-task > Components: core >Affects Versions: 3.6.0 >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Fix For: 3.6.0 > > > As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to > upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will > have to wait for retention to clean up segments without an associated > producer snapshot. > However, in our experience, customers of Kafka expect to be able to > immediately enable tiering on a topic once their cluster upgrade is complete. > Once they do this, however, they start seeing NPEs and no data is uploaded to > Tiered Storage > (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61). > To achieve this, we propose changing Kafka to retroactively create producer > snapshot files on upload whenever a segment is due to be archived and lacks > one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749088#comment-17749088 ] Satish Duggana commented on KAFKA-15195: It is not a blocker for 3.6.0 as KIP-405 talks about supporting it from 2.8.0 or later releases. > Regenerate segment-aligned producer snapshots when upgrading to a Kafka > version supporting Tiered Storage > - > > Key: KAFKA-15195 > URL: https://issues.apache.org/jira/browse/KAFKA-15195 > Project: Kafka > Issue Type: Sub-task > Components: core >Affects Versions: 3.6.0 >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Fix For: 3.6.0 > > > As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to > upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will > have to wait for retention to clean up segments without an associated > producer snapshot. > However, in our experience, customers of Kafka expect to be able to > immediately enable tiering on a topic once their cluster upgrade is complete. > Once they do this, however, they start seeing NPEs and no data is uploaded to > Tiered Storage > (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61). > To achieve this, we propose changing Kafka to retroactively create producer > snapshot files on upload whenever a segment is due to be archived and lacks > one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749090#comment-17749090 ] Satish Duggana commented on KAFKA-14972: [~erikvanoosten] I am removing this KIP from 3.6.0 release plan for now as it is not accepted. Please correct me if I am missing something here. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749092#comment-17749092 ] Satish Duggana commented on KAFKA-14972: [~erikvanoosten] I am removing this from 3.6.0 release plan as the respective KIP is not yet accepted. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972 ] Satish Duggana deleted comment on KAFKA-14972: was (Author: satish.duggana): [~erikvanoosten] I am removing this from 3.6.0 release plan as the respective KIP is not yet accepted. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15248) Add BooleanConverter to Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749094#comment-17749094 ] Satish Duggana commented on KAFKA-15248: [~hgeraldino] Removing from 3.6.0 release plan as the kip is not yet approved. Please let me know if I am missing some thing here. > Add BooleanConverter to Kafka Connect > - > > Key: KAFKA-15248 > URL: https://issues.apache.org/jira/browse/KAFKA-15248 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > KIP-959: Add BooleanConverter to Kafka Connect -> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-959%3A+Add+BooleanConverter+to+Kafka+Connect -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon opened a new pull request, #14133: KAFKA-15189: only init remote topic metrics when enabled
showuon opened a new pull request, #14133: URL: https://github.com/apache/kafka/pull/14133 Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #14133: KAFKA-15189: only init remote topic metrics when enabled
showuon commented on code in PR #14133: URL: https://github.com/apache/kafka/pull/14133#discussion_r1279033437 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -127,7 +127,7 @@ public class RemoteLogManagerTest { RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); RemoteLogManagerConfig remoteLogManagerConfig = null; -BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); +BrokerTopicStats brokerTopicStats = new BrokerTopicStats(true); Review Comment: force to true since we're testing `RemoteLogManager` ## core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java: ## @@ -44,7 +44,7 @@ public class RemoteLogReaderTest { public static final String TOPIC = "test"; RemoteLogManager mockRLM = mock(RemoteLogManager.class); -BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); +BrokerTopicStats brokerTopicStats = new BrokerTopicStats(true); Review Comment: force to true since we're testing `RemoteLogReader` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #14133: KAFKA-15189: only init remote topic metrics when enabled
showuon commented on code in PR #14133: URL: https://github.com/apache/kafka/pull/14133#discussion_r1279034999 ## core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java: ## @@ -179,6 +179,7 @@ public ReplicaManager build() { if (metadataCache == null) throw new RuntimeException("You must set metadataCache"); if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel"); if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager"); +if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); Review Comment: Initializing brokerTopicStats if null, with the initialized config in L176 above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14133: KAFKA-15189: only init remote topic metrics when enabled
showuon commented on PR #14133: URL: https://github.com/apache/kafka/pull/14133#issuecomment-1658003702 @kamalcph @divijvaidya , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
AndrewJSchofield commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1279051318 ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -203,18 +224,43 @@ public class CommonClientConfigs { * @return The new values which have been set as described in postProcessParsedConfig. */ public static Map postProcessReconnectBackoffConfigs(AbstractConfig config, -Map parsedValues) { + Map parsedValues) { HashMap rval = new HashMap<>(); Map originalConfig = config.originals(); if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) { -log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.", +log.warn("Disabling exponential reconnect backoff because {} is set, but {} is not.", RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG); rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG)); } return rval; } +/** + * Log warning if the exponential backoff is disabled due to initial backoff value is greater than max backoff value. + * + * @param configThe config object. + */ +public static void warnDisablingExponentialBackoff(AbstractConfig config) { +long retryBackoffMs = config.getLong(RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(RETRY_BACKOFF_MAX_MS_CONFIG); +if (retryBackoffMs > retryBackoffMaxMs) { +log.warn("Configuration '{}' with value '{}' is greater than configuration '{}' with value '{}'. " + +"A static backoff with value '{}' will be applied.", +RETRY_BACKOFF_MS_CONFIG, retryBackoffMs, +RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs, retryBackoffMs); Review Comment: Digging into this, I discovered that KIP-601 was not quite implemented correctly and the backoff value could exceed the maximum either because of jitter, or because of a lower value for the max config. I'd just followed the same path for KIP-580, which is not correct. I have fixed the exponential backoff code for both of these cases so that the maximum is strictly enforced. Personally, I think that it's preferable if it is possible for the maximum to be exceeded because of jitter, but that is not what the KIP says. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
[ https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jianbin.chen updated KAFKA-15264: - Attachment: image-2023-07-31-18-04-54-112.png > Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter > --- > > Key: KAFKA-15264 > URL: https://issues.apache.org/jira/browse/KAFKA-15264 > Project: Kafka > Issue Type: Bug >Reporter: jianbin.chen >Priority: Major > Attachments: image-2023-07-28-09-51-01-662.png, > image-2023-07-28-09-52-38-941.png, image-2023-07-31-18-04-54-112.png, > image-2023-07-31-18-05-21-772.png > > > I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster > deployment), and when I recently compared and tested, I found that when using > the following stress test command, the throughput gap is obvious > > {code:java} > ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 > --record-size 1024 --throughput -1 --producer-props > bootstrap.servers=xxx: acks=1 > 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg > latency, 588.0 ms max latency. > 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg > latency, 460.0 ms max latency. > 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg > latency, 1120.0 ms max latency. > 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg > latency, 1097.0 ms max latency. > 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg > latency, 610.0 ms max latency. > 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg > latency, 1892.0 ms max latency. > 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg > latency, 3000.0 ms max latency. > 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg > latency, 1781.0 ms max latency. > 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg > latency, 2590.0 ms max latency. > 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg > latency, 1463.0 ms max latency. > 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg > latency, 2362.0 ms max latency. > 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg > latency, 2986.0 ms max latency. > 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg > latency, 2958.0 ms max latency. > 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg > latency, 1959.0 ms max latency. > 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg > latency, 1995.0 ms max latency. > 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg > latency, 1496.0 ms max latency. > 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg > latency, 2446.0 ms max latency. > 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg > latency, 2715.0 ms max latency. > 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg > latency, 2702.0 ms max latency. > 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg > latency, 1846.0 ms max latency. > 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg > latency, 1414.0 ms max latency. > 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg > latency, 1897.0 ms max latency. > 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg > latency, 1601.0 ms max latency. > 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg > latency, 1563.0 ms max latency. > 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg > latency, 1975.0 ms max latency. > 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg > latency, 1753.0 ms max latency. > 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg > latency, 1833.0 ms max latency. > 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg > latency, 1927.0 ms max latency. > 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg > latency, 1685.0 ms max latency. > 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg > latency, 2146.0 ms max latency. > 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg > latency, 2125.0 ms max latency. > 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg > latency, 1502.0 ms max latency. > 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg > latency, 1932.0 ms max latency. > 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg > latency, 1807.0 ms max latency. > 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg > latency, 1892.0 ms max latency. > 354526 records sent, 70905.2 records/
[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
[ https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jianbin.chen updated KAFKA-15264: - Attachment: image-2023-07-31-18-05-21-772.png > Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter > --- > > Key: KAFKA-15264 > URL: https://issues.apache.org/jira/browse/KAFKA-15264 > Project: Kafka > Issue Type: Bug >Reporter: jianbin.chen >Priority: Major > Attachments: image-2023-07-28-09-51-01-662.png, > image-2023-07-28-09-52-38-941.png, image-2023-07-31-18-04-54-112.png, > image-2023-07-31-18-05-21-772.png > > > I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster > deployment), and when I recently compared and tested, I found that when using > the following stress test command, the throughput gap is obvious > > {code:java} > ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 > --record-size 1024 --throughput -1 --producer-props > bootstrap.servers=xxx: acks=1 > 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg > latency, 588.0 ms max latency. > 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg > latency, 460.0 ms max latency. > 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg > latency, 1120.0 ms max latency. > 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg > latency, 1097.0 ms max latency. > 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg > latency, 610.0 ms max latency. > 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg > latency, 1892.0 ms max latency. > 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg > latency, 3000.0 ms max latency. > 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg > latency, 1781.0 ms max latency. > 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg > latency, 2590.0 ms max latency. > 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg > latency, 1463.0 ms max latency. > 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg > latency, 2362.0 ms max latency. > 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg > latency, 2986.0 ms max latency. > 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg > latency, 2958.0 ms max latency. > 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg > latency, 1959.0 ms max latency. > 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg > latency, 1995.0 ms max latency. > 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg > latency, 1496.0 ms max latency. > 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg > latency, 2446.0 ms max latency. > 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg > latency, 2715.0 ms max latency. > 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg > latency, 2702.0 ms max latency. > 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg > latency, 1846.0 ms max latency. > 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg > latency, 1414.0 ms max latency. > 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg > latency, 1897.0 ms max latency. > 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg > latency, 1601.0 ms max latency. > 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg > latency, 1563.0 ms max latency. > 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg > latency, 1975.0 ms max latency. > 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg > latency, 1753.0 ms max latency. > 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg > latency, 1833.0 ms max latency. > 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg > latency, 1927.0 ms max latency. > 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg > latency, 1685.0 ms max latency. > 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg > latency, 2146.0 ms max latency. > 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg > latency, 2125.0 ms max latency. > 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg > latency, 1502.0 ms max latency. > 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg > latency, 1932.0 ms max latency. > 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg > latency, 1807.0 ms max latency. > 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg > latency, 1892.0 ms max latency. > 354526 records sent, 70905.2 records/
[jira] [Commented] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
[ https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749124#comment-17749124 ] jianbin.chen commented on KAFKA-15264: -- The latest progress, I upgraded the 1.1.0+zk cluster to 3.5.1+zk !image-2023-07-31-18-05-21-772.png|width=745,height=288! {code:java} 362387 records sent, 72477.4 records/sec (70.78 MB/sec), 292.9 ms avg latency, 922.0 ms max latency. 545850 records sent, 109148.2 records/sec (106.59 MB/sec), 282.8 ms avg latency, 739.0 ms max latency. 533250 records sent, 106650.0 records/sec (104.15 MB/sec), 286.2 ms avg latency, 1478.0 ms max latency. 513000 records sent, 102600.0 records/sec (100.20 MB/sec), 267.9 ms avg latency, 2386.0 ms max latency. 538290 records sent, 107636.5 records/sec (105.11 MB/sec), 302.5 ms avg latency, 5450.0 ms max latency. 536850 records sent, 107370.0 records/sec (104.85 MB/sec), 268.7 ms avg latency, 1367.0 ms max latency. 552300 records sent, 110460.0 records/sec (107.87 MB/sec), 294.1 ms avg latency, 1474.0 ms max latency. 483464 records sent, 96673.5 records/sec (94.41 MB/sec), 254.0 ms avg latency, 1207.0 ms max latency. 498666 records sent, 99713.3 records/sec (97.38 MB/sec), 368.1 ms avg latency, 2662.0 ms max latency. 542400 records sent, 108436.6 records/sec (105.90 MB/sec), 280.9 ms avg latency, 1290.0 ms max latency. 462288 records sent, 92439.1 records/sec (90.27 MB/sec), 315.3 ms avg latency, 2558.0 ms max latency. 541980 records sent, 108396.0 records/sec (105.86 MB/sec), 295.8 ms avg latency, 2641.0 ms max latency. 554700 records sent, 110895.6 records/sec (108.30 MB/sec), 268.8 ms avg latency, 530.0 ms max latency. 520350 records sent, 104070.0 records/sec (101.63 MB/sec), 293.7 ms avg latency, 2977.0 ms max latency. 483600 records sent, 96700.7 records/sec (94.43 MB/sec), 261.1 ms avg latency, 2704.0 ms max latency. 559500 records sent, 111877.6 records/sec (109.26 MB/sec), 319.4 ms avg latency, 2713.0 ms max latency. 549300 records sent, 109838.0 records/sec (107.26 MB/sec), 278.3 ms avg latency, 1683.0 ms max latency. 551700 records sent, 110340.0 records/sec (107.75 MB/sec), 277.5 ms avg latency, 1731.0 ms max latency. 548550 records sent, 109710.0 records/sec (107.14 MB/sec), 248.7 ms avg latency, 1821.0 ms max latency. 552900 records sent, 110557.9 records/sec (107.97 MB/sec), 259.4 ms avg latency, 1821.0 ms max latency. 551400 records sent, 110280.0 records/sec (107.70 MB/sec), 318.7 ms avg latency, 3584.0 ms max latency. 487350 records sent, 97470.0 records/sec (95.19 MB/sec), 286.6 ms avg latency, 2205.0 ms max latency. 542850 records sent, 108548.3 records/sec (106.00 MB/sec), 285.9 ms avg latency, 2273.0 ms max latency. 545850 records sent, 109148.2 records/sec (106.59 MB/sec), 278.7 ms avg latency, 1790.0 ms max latency. 557250 records sent, 111450.0 records/sec (108.84 MB/sec), 293.4 ms avg latency, 1572.0 ms max latency. 466231 records sent, 93246.2 records/sec (91.06 MB/sec), 244.0 ms avg latency, 2566.0 ms max latency. 538157 records sent, 107631.4 records/sec (105.11 MB/sec), 351.2 ms avg latency, 2565.0 ms max latency. 556650 records sent, 111330.0 records/sec (108.72 MB/sec), 274.4 ms avg latency, 392.0 ms max latency. 542144 records sent, 108428.8 records/sec (105.89 MB/sec), 270.6 ms avg latency, 2148.0 ms max latency. 416323 records sent, 83264.6 records/sec (81.31 MB/sec), 270.0 ms avg latency, 2681.0 ms max latency. 472528 records sent, 94505.6 records/sec (92.29 MB/sec), 389.7 ms avg latency, 3709.0 ms max latency. 503602 records sent, 100720.4 records/sec (98.36 MB/sec), 320.8 ms avg latency, 4058.0 ms max latency. 484719 records sent, 96905.0 records/sec (94.63 MB/sec), 293.5 ms avg latency, 2579.0 ms max latency. 375346 records sent, 75069.2 records/sec (73.31 MB/sec), 449.0 ms avg latency, 2982.0 ms max latency. 534300 records sent, 106838.6 records/sec (104.33 MB/sec), 282.5 ms avg latency, 1581.0 ms max latency. 533550 records sent, 106710.0 records/sec (104.21 MB/sec), 288.5 ms avg latency, 2381.0 ms max latency. 319650 records sent, 63917.2 records/sec (62.42 MB/sec), 487.8 ms avg latency, 2517.0 ms max latency. 538950 records sent, 107790.0 records/sec (105.26 MB/sec), 280.8 ms avg latency, 1608.0 ms max latency. 547050 records sent, 109410.0 records/sec (106.85 MB/sec), 272.3 ms avg latency, 2777.0 ms max latency. 372958 records sent, 74591.6 records/sec (72.84 MB/sec), 381.7 ms avg latency, 2447.0 ms max latency. 550438 records sent, 110087.6 records/sec (107.51 MB/sec), 297.7 ms avg latency, 2545.0 ms max latency. 529650 records sent, 105930.0 records/sec (103.45 MB/sec), 279.5 ms avg latency, 2695.0 ms max latency. 428030 records sent, 85520.5 records/sec (83.52 MB/sec), 289.7 ms avg latency, 5425.0 ms max latency. 415080 records sent, 83016.0 records/sec (81.07 MB/sec), 329.4 ms avg latency, 5011.0 ms max latency. 426597 records sent, 85
[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
kamalcph commented on code in PR #14128: URL: https://github.com/apache/kafka/pull/14128#discussion_r1279093020 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -525,6 +524,26 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException } } +List enrichedLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) { +List enrichedLogSegments = new ArrayList<>(); +List segments = JavaConverters.seqAsJavaList(log.nonActiveLogSegmentsFrom(fromOffset).toSeq()); +if (!segments.isEmpty()) { +int idx = 1; +for (; idx < segments.size(); idx++) { +LogSegment previous = segments.get(idx - 1); +LogSegment current = segments.get(idx); +enrichedLogSegments.add(new EnrichedLogSegment(previous, current.baseOffset())); +} +// LogSegment#readNextOffset() is an expensive call, so we only call it when necessary. +int lastIdx = idx - 1; +if (segments.get(lastIdx).baseOffset() < lastStableOffset) { +LogSegment last = segments.get(lastIdx); +enrichedLogSegments.add(new EnrichedLogSegment(last, last.readNextOffset())); +} Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14133: KAFKA-15189: only init remote topic metrics when enabled
divijvaidya commented on code in PR #14133: URL: https://github.com/apache/kafka/pull/14133#discussion_r1279101884 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -227,7 +227,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, } } -class BrokerTopicMetrics(name: Option[String]) { +class BrokerTopicMetrics(name: Option[String], systemRemoteStorageEnabled: Boolean) { Review Comment: In it's current form, this is not very extensible for future cases because if we want to optionally add metrics for a new feature, we will have to add new booleans as arguments to determine the usage of that feature. Perhaps we can either pass the entire config object and let this `BrokerTopicMetrics` class make a decision based on config on what metrics it want to enable. ## core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala: ## @@ -77,4 +79,28 @@ class KafkaRequestHandlerTest { assertEquals(Some(startTime + 200), request.callbackRequestDequeueTimeNanos) assertEquals(Some(startTime + 300), request.callbackRequestCompleteTimeNanos) } + + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testTopicStats(systemRemoteStorageEnabled: Boolean): Unit = { Review Comment: I would feel more confident if we have a test, that spins up the servers without Remote Storage enabled and verifies that no Remote Storage related metrics are registered. To do this, we will have to collect all Remote Storage metrics in a class called "RemoteStorageMetrics" similar to QuorumControllerMetrics and use that class as source of truth for all metrics names related to RemoteStorage. In the test, we can validate that none of the metrics related to RemoteStorage is registered in the default registry. ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -439,12 +441,14 @@ class BrokerTopicStats extends Logging { topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec) topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec) topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesOutPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesInPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteReadRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteWriteRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteReadRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteWriteRequestsPerSec) + if (systemRemoteStorageEnabled) { Review Comment: closeMetric is a no-op if metric is missing, hence we don't need to do a check here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager
[ https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749158#comment-17749158 ] Divij Vaidya commented on KAFKA-15038: -- Hi [~owen-leung] Than you for looking into this. Yes, we want to replace *ConcurrentMap topicPartitionIds* cache in RemoteLogManager. However, instead we want to cache available in every broker called the Metadata cache [1]which will be the single source of authority on a broker about topicId <-> topicName mapping. This cache is updated asynchronously on every broker. [1] [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/MetadataCache.scala] > Use topic id/name mapping from the Metadata cache in the RemoteLogManager > - > > Key: KAFKA-15038 > URL: https://issues.apache.org/jira/browse/KAFKA-15038 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Alexandre Dupriez >Assignee: Owen C.H. Leung >Priority: Minor > > Currently, the {{RemoteLogManager}} maintains its own cache of topic name to > topic id > [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138] > using the information provided during leadership changes, and removing the > mapping upon receiving the notification of partition stopped. > It should be possible to re-use the mapping in a broker's metadata cache, > removing the need for the RLM to build and update a local cache thereby > duplicating the information in the metadata cache. It also allows to preserve > a single source of authority regarding the association between topic names > and ids. > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] sambhav-jain-16 opened a new pull request, #14134: test changes
sambhav-jain-16 opened a new pull request, #14134: URL: https://github.com/apache/kafka/pull/14134 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
clolov commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1658242751 It appears that in Zookeeper 3.8.0 the Zookeeper community moved away from log4j to logback (source: https://issues.apache.org/jira/browse/ZOOKEEPER-4427) In the failing test cases we attach an appender to the root logger (Kafka uses a log4j logger) and assert that we see certain log messages. Because of the mismatch in expectation (log4j) and reality (logback) these tests have started failing as the test-appender has no messages. The difference in Zookeeper dependencies between current trunk (31 July 2023) and this pull request is: ``` trunk org.apache.zookeeper:zookeeper:3.6.4 > io.netty:netty-handler:4.1.94.Final > io.netty:netty-transport-native-epoll:4.1.94.Final > org.apache.yetus:audience-annotations:0.13.0 > org.apache.zookeeper:zookeeper-jute:3.6.4 > org.slf4j:slf4j-api:1.7.36 pull request org.apache.zookeeper:zookeeper:3.8.2 > ch.qos.logback:logback-classic:1.2.10 <- NEW > ch.qos.logback:logback-core:1.2.10<- NEW > commons-io:commons-io:2.11.0 <- NEW > io.netty:netty-handler:4.1.94.Final > io.netty:netty-transport-native-epoll:4.1.94.Final > org.apache.yetus:audience-annotations:0.12.0 > org.apache.zookeeper:zookeeper-jute:3.8.2 > org.slf4j:slf4j-api:1.7.36 ``` I have removed the `ch.qos.logback:*` dependencies in the latest commit and tests have started passing locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15274) support moving files to be deleted to other directories
jianbin.chen created KAFKA-15274: Summary: support moving files to be deleted to other directories Key: KAFKA-15274 URL: https://issues.apache.org/jira/browse/KAFKA-15274 Project: Kafka Issue Type: Improvement Reporter: jianbin.chen Hello everyone, I am a Kafka user from China. Our company operates in public clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large amount of data exchange and business message delivery every day. Daily messages consume a significant amount of disk space. Purchasing the corresponding storage capacity on these cloud providers incurs substantial costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective for disaster recovery, especially in the event of a sudden broker failure where storage space becomes full or memory space is exhausted leading to OOM kills. This high IOPS storage greatly improves data recovery efficiency, forcing us to adopt smaller storage specifications with high IO to save costs. Particularly, cloud providers only allow capacity expansion but not reduction. Currently, we have come up with a solution and would like to contribute it to the community for discussion. When we need to delete logs, I can purchase S3 or Minio storage from services like AWS and mount it to my brokers. When a log needs to be deleted, we can decide how it leaves the broker. The default is to delete it directly, while the move option moves it to S3. Since most of the deleted data is cold data that won't be used in the short term, this approach improves the retention period of historical data while maintaining good cost control. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15274) support moving files to be deleted to other directories
[ https://issues.apache.org/jira/browse/KAFKA-15274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jianbin.chen reassigned KAFKA-15274: Assignee: jianbin.chen > support moving files to be deleted to other directories > --- > > Key: KAFKA-15274 > URL: https://issues.apache.org/jira/browse/KAFKA-15274 > Project: Kafka > Issue Type: Improvement >Reporter: jianbin.chen >Assignee: jianbin.chen >Priority: Major > > Hello everyone, I am a Kafka user from China. Our company operates in public > clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large > amount of data exchange and business message delivery every day. Daily > messages consume a significant amount of disk space. Purchasing the > corresponding storage capacity on these cloud providers incurs substantial > costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective > for disaster recovery, especially in the event of a sudden broker failure > where storage space becomes full or memory space is exhausted leading to OOM > kills. This high IOPS storage greatly improves data recovery efficiency, > forcing us to adopt smaller storage specifications with high IO to save > costs. Particularly, cloud providers only allow capacity expansion but not > reduction. > Currently, we have come up with a solution and would like to contribute it to > the community for discussion. When we need to delete logs, I can purchase S3 > or Minio storage from services like AWS and mount it to my brokers. When a > log needs to be deleted, we can decide how it leaves the broker. The default > is to delete it directly, while the move option moves it to S3. Since most of > the deleted data is cold data that won't be used in the short term, this > approach improves the retention period of historical data while maintaining > good cost control. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] a364176773 commented on pull request #14129: feature: support moving files to be deleted to other directories
a364176773 commented on PR #14129: URL: https://github.com/apache/kafka/pull/14129#issuecomment-1658258913 https://issues.apache.org/jira/browse/KAFKA-15274 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.
[ https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749178#comment-17749178 ] Nikita commented on KAFKA-15252: Hi [~ChrisEgerton]! I think the tickets [KAFKA-15090|https://issues.apache.org/jira/browse/KAFKA-15090] and [KAFKA-14725|https://issues.apache.org/jira/browse/KAFKA-14725] cover our case. Let's close current one as duplicate. > Task is not stopped until the poll interval passes in case of task restarting. > -- > > Key: KAFKA-15252 > URL: https://issues.apache.org/jira/browse/KAFKA-15252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nikita >Priority: Major > > We face a problem with restarting of the tasks, sometimes it leads to > resource leak. > We used the jdbc source connector and noticed an increasing of count of > opened sessions on Vertica side. But this problem is applicable for all > databases and possibly for all source connectors. > Our case is the next: > 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) > and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's > the property on Kafka-connect side, we set 1) > 2) Send POST /connectors//tasks//restart > ER: count of session is the same as before restart > AR: count of session increases > The main problem is when > org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection) > method is called it doesn't stop a source task itself. > The source task stops only if polling process stops on source task side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.
[ https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikita resolved KAFKA-15252. Resolution: Duplicate > Task is not stopped until the poll interval passes in case of task restarting. > -- > > Key: KAFKA-15252 > URL: https://issues.apache.org/jira/browse/KAFKA-15252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nikita >Priority: Major > > We face a problem with restarting of the tasks, sometimes it leads to > resource leak. > We used the jdbc source connector and noticed an increasing of count of > opened sessions on Vertica side. But this problem is applicable for all > databases and possibly for all source connectors. > Our case is the next: > 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) > and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's > the property on Kafka-connect side, we set 1) > 2) Send POST /connectors//tasks//restart > ER: count of session is the same as before restart > AR: count of session increases > The main problem is when > org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection) > method is called it doesn't stop a source task itself. > The source task stops only if polling process stops on source task side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
showuon commented on code in PR #14128: URL: https://github.com/apache/kafka/pull/14128#discussion_r1279212920 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException if (lso < 0) { logger.warn("lastStableOffset for partition {} is {}, which should not be negative.", topicIdPartition, lso); } else if (lso > 0 && copiedOffset < lso) { -// Copy segments only till the last-stable-offset as remote storage should contain only committed/acked -// messages -long toOffset = lso; -logger.debug("Checking for segments to copy, copiedOffset: {} and toOffset: {}", copiedOffset, toOffset); -long activeSegBaseOffset = log.activeSegment().baseOffset(); // log-start-offset can be ahead of the read-offset, when: // 1) log-start-offset gets incremented via delete-records API (or) // 2) enabling the remote log for the first time long fromOffset = Math.max(copiedOffset + 1, log.logStartOffset()); -ArrayList sortedSegments = new ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, toOffset))); - sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset)); -List sortedBaseOffsets = sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList()); -int activeSegIndex = Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset); - -// sortedSegments becomes empty list when fromOffset and toOffset are same, and activeSegIndex becomes -1 -if (activeSegIndex < 0) { +long activeSegmentBaseOffset = log.activeSegment().baseOffset(); + +// Segments which match the following criteria are eligible for copying to remote storage: +// 1) Segment is not the active segment and +// 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only +//committed/acked messages +List candidateSegments = enrichedLogSegments(log, fromOffset) +.stream() +.filter(enrichedSegment -> enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset && enrichedSegment.nextSegmentOffset <= lso) Review Comment: If a segment contains 0 records and `log.roll.ms` timeout passed, then what will be the `baseOffset` of next active segment? Do you have any idea? I asked because I'm not sure if `enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset` is necessary. If we already filtered out the active segment in the `enrichedLogSegments` method, should we also need this check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15274) support moving files to be deleted to other directories
[ https://issues.apache.org/jira/browse/KAFKA-15274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jianbin.chen updated KAFKA-15274: - Issue Type: Task (was: Improvement) > support moving files to be deleted to other directories > --- > > Key: KAFKA-15274 > URL: https://issues.apache.org/jira/browse/KAFKA-15274 > Project: Kafka > Issue Type: Task >Reporter: jianbin.chen >Assignee: jianbin.chen >Priority: Major > > Hello everyone, I am a Kafka user from China. Our company operates in public > clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large > amount of data exchange and business message delivery every day. Daily > messages consume a significant amount of disk space. Purchasing the > corresponding storage capacity on these cloud providers incurs substantial > costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective > for disaster recovery, especially in the event of a sudden broker failure > where storage space becomes full or memory space is exhausted leading to OOM > kills. This high IOPS storage greatly improves data recovery efficiency, > forcing us to adopt smaller storage specifications with high IO to save > costs. Particularly, cloud providers only allow capacity expansion but not > reduction. > Currently, we have come up with a solution and would like to contribute it to > the community for discussion. When we need to delete logs, I can purchase S3 > or Minio storage from services like AWS and mount it to my brokers. When a > log needs to be deleted, we can decide how it leaves the broker. The default > is to delete it directly, while the move option moves it to S3. Since most of > the deleted data is cold data that won't be used in the short term, this > approach improves the retention period of historical data while maintaining > good cost control. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
kamalcph commented on code in PR #14128: URL: https://github.com/apache/kafka/pull/14128#discussion_r1279280694 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException if (lso < 0) { logger.warn("lastStableOffset for partition {} is {}, which should not be negative.", topicIdPartition, lso); } else if (lso > 0 && copiedOffset < lso) { -// Copy segments only till the last-stable-offset as remote storage should contain only committed/acked -// messages -long toOffset = lso; -logger.debug("Checking for segments to copy, copiedOffset: {} and toOffset: {}", copiedOffset, toOffset); -long activeSegBaseOffset = log.activeSegment().baseOffset(); // log-start-offset can be ahead of the read-offset, when: // 1) log-start-offset gets incremented via delete-records API (or) // 2) enabling the remote log for the first time long fromOffset = Math.max(copiedOffset + 1, log.logStartOffset()); -ArrayList sortedSegments = new ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, toOffset))); - sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset)); -List sortedBaseOffsets = sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList()); -int activeSegIndex = Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset); - -// sortedSegments becomes empty list when fromOffset and toOffset are same, and activeSegIndex becomes -1 -if (activeSegIndex < 0) { +long activeSegmentBaseOffset = log.activeSegment().baseOffset(); + +// Segments which match the following criteria are eligible for copying to remote storage: +// 1) Segment is not the active segment and +// 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only +//committed/acked messages +List candidateSegments = enrichedLogSegments(log, fromOffset) +.stream() +.filter(enrichedSegment -> enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset && enrichedSegment.nextSegmentOffset <= lso) Review Comment: > If a segment contains 0 records and log.roll.ms timeout passed, then what will be the baseOffset of next active segment? If the active segment is empty, then it [won't be rotated](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L71). The in-memory check `enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset` is kind of redundant, we can remove it later when required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #14126: MINOR Fix a Scala 2.12 compile issue
mumrah merged PR #14126: URL: https://github.com/apache/kafka/pull/14126 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
clolov commented on code in PR #14128: URL: https://github.com/apache/kafka/pull/14128#discussion_r127954 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -538,35 +551,33 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException if (lso < 0) { logger.warn("lastStableOffset for partition {} is {}, which should not be negative.", topicIdPartition, lso); } else if (lso > 0 && copiedOffset < lso) { -// Copy segments only till the last-stable-offset as remote storage should contain only committed/acked -// messages -long toOffset = lso; -logger.debug("Checking for segments to copy, copiedOffset: {} and toOffset: {}", copiedOffset, toOffset); -long activeSegBaseOffset = log.activeSegment().baseOffset(); // log-start-offset can be ahead of the read-offset, when: // 1) log-start-offset gets incremented via delete-records API (or) // 2) enabling the remote log for the first time long fromOffset = Math.max(copiedOffset + 1, log.logStartOffset()); -ArrayList sortedSegments = new ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, toOffset))); - sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset)); -List sortedBaseOffsets = sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList()); -int activeSegIndex = Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset); - -// sortedSegments becomes empty list when fromOffset and toOffset are same, and activeSegIndex becomes -1 -if (activeSegIndex < 0) { +long activeSegmentBaseOffset = log.activeSegment().baseOffset(); + +// Segments which match the following criteria are eligible for copying to remote storage: +// 1) Segment is not the active segment and +// 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only +//committed/acked messages +List candidateSegments = enrichedLogSegments(log, fromOffset) +.stream() +.filter(enrichedSegment -> enrichedSegment.logSegment.baseOffset() != activeSegmentBaseOffset && enrichedSegment.nextSegmentOffset <= lso) Review Comment: Nit: Instead of doing this filtering after we have created the enriched segments list and iterating again can we not push the condition as part of creating the enriched segments list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment
vamossagar12 commented on code in PR #14000: URL: https://github.com/apache/kafka/pull/14000#discussion_r1279411709 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -151,7 +151,7 @@ public void testAssignmentsWhenWorkersJoinAfterRevocations() { // in this round addNewEmptyWorkers("worker3"); performStandardRebalance(); -assertTrue(assignor.delay > 0); +assertEquals(40, assignor.delay); // First successive revoking rebalance. Review Comment: Added an elaborate check over here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment
vamossagar12 commented on code in PR #14000: URL: https://github.com/apache/kafka/pull/14000#discussion_r1279414273 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -161,30 +161,38 @@ public void testAssignmentsWhenWorkersJoinAfterRevocations() { time.sleep(assignor.delay); addNewEmptyWorkers("worker4"); performStandardRebalance(); +assertEquals(0, assignor.delay); // No revocations in this round. assertWorkers("worker1", "worker2", "worker3", "worker4"); assertConnectorAllocations(0, 0, 1, 1); assertTaskAllocations(0, 3, 3, 3); // Fifth assignment and a fifth worker joining after a revoking rebalance. -// We shouldn't revoke and set a delay > initial interval +// We shouldn't revoke and set a delay equal to initial interval addNewEmptyWorkers("worker5"); performStandardRebalance(); -assertTrue(assignor.delay > 40); +assertEquals(40, assignor.delay); // First successive revoking rebalance. Review Comment: IMO this happens because we weren't unsetting the counter when delay goes equal to 0. This is the first successive revoking rebalance and the delay should be 40. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment
vamossagar12 commented on code in PR #14000: URL: https://github.com/apache/kafka/pull/14000#discussion_r1279415811 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -161,30 +161,38 @@ public void testAssignmentsWhenWorkersJoinAfterRevocations() { time.sleep(assignor.delay); addNewEmptyWorkers("worker4"); performStandardRebalance(); +assertEquals(0, assignor.delay); // No revocations in this round. assertWorkers("worker1", "worker2", "worker3", "worker4"); assertConnectorAllocations(0, 0, 1, 1); assertTaskAllocations(0, 3, 3, 3); // Fifth assignment and a fifth worker joining after a revoking rebalance. -// We shouldn't revoke and set a delay > initial interval +// We shouldn't revoke and set a delay equal to initial interval addNewEmptyWorkers("worker5"); performStandardRebalance(); -assertTrue(assignor.delay > 40); +assertEquals(40, assignor.delay); // First successive revoking rebalance. assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5"); assertConnectorAllocations(0, 0, 1, 1, 1); assertTaskAllocations(1, 2, 3, 3, 3); -// Sixth assignment with sixth worker joining after the expiry. -// Should revoke -time.sleep(assignor.delay); Review Comment: I removed this to prove that the changes still work correctly and the delay gets extended further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments
vamossagar12 commented on PR #14000: URL: https://github.com/apache/kafka/pull/14000#issuecomment-1658526465 @gharris1727 I updated the PR to cater to the changes I was talking about [here](https://github.com/apache/kafka/pull/14000#issuecomment-1642481356). Note that the original comment correction is not needed anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
clolov commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1658538656 The failures in JDK 8/Scala 2.12 have to do with formatting on code which hasn't been touched as part of this pull request, so I believe it is safe to go ahead. There are 2 failures in JDK 20/Scala 2.13 - one concerned with tiered storage (RemoteIndexCacheTest - cleaner did not shutdown) and one with simple production (PlaintextProducerSendTest - not enough brokers for specified RF). I believe these are transient failures as they do not manifest when running locally, but if people are unconvinced we can rerun the tests. Otherwise, I think we are ready to merge this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749225#comment-17749225 ] Christo Lolov commented on KAFKA-15267: --- Heya [~satishd], [~showuon], [~divijvaidya]! I had a quick sync with Satish about this ticket and we believe that since this is an already defined configuration the proposed changes do not require a KIP. What are your thoughts on the problem and the recommended approach? > Cluster-wide disablement of Tiered Storage > -- > > Key: KAFKA-15267 > URL: https://issues.apache.org/jira/browse/KAFKA-15267 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > h2. Summary > KIP-405 defines the configuration {{remote.log.storage.system.enable}} which > controls whether all resources needed for Tiered Storage to function are > instantiated properly in Kafka. However, the interaction between remote data > and Kafka if that configuration is set to false while there are still topics > with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would > like to give customers the ability to switch off Tiered Storage on a cluster > level and as such would need to define the behaviour.*{color} > {{remote.log.storage.system.enable}} is a read-only configuration. This means > that it can only be changed by *modifying the server.properties* and > restarting brokers. As such, the {*}validity of values contained in it is > only checked at broker startup{*}. > This JIRA proposes a few behaviours and a recommendation on a way forward. > h2. Option 1: Change nothing > Pros: > * No operation. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster > level and do not allow it to be disabled > Always instantiate all resources for tiered storage. If no special ones are > selected use the default ones which come with Kafka. > Pros: > * We solve the problem for moving between versions not allowing TS to be > disabled. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > * We haven’t quantified how much computer resources (CPU, memory) idle TS > components occupy. > * TS is a feature not required for running Kafka. As such, while it is still > under development we shouldn’t put it on the critical path of starting a > broker. In this way, a stray memory leak won’t impact anything on the > critical path of a broker. > * We are potentially swapping one problem for another. How does TS behave if > one decides to swap the TS plugin classes when data has already been written? > h2. Option 3: Hide topics with tiering enabled > Customers cannot interact with topics which have tiering enabled. They cannot > create new topics with the same names. Retention (and compaction?) do not > take effect on files already in local storage. > Pros: > * We do not force data-deletion. > Cons: > * This will be quite involved - the controller will need to know when a > broker’s server.properties have been altered; the broker will need to not > proceed to delete logs it is not the leader or follower for. > h2. {color:#00875a}Option 4: Do not start the broker if there are topics with > tiering enabled{color} - Recommended > This option has 2 different sub-options. The first one is that TS cannot be > disabled on cluster-level if there are *any* tiering topics - in other words > all tiered topics need to be deleted. The second one is that TS cannot be > disabled on a cluster-level if there are *any* topics with *tiering enabled* > - they can have tiering disabled, but with a retention policy set to delete > or retain (as per > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). > A topic can have tiering disabled and remain on the cluster as long as there > is no *remote* data when TS is disabled cluster-wide. > Pros: > * We force the customer to be very explicit in disabling tiering of topics > prior to disabling TS on the whole cluster. > Cons: > * You have to make certain that all data in remote is deleted (just a > disablement of tired topic is not enough). How do you determine whether all > remote has expired if policy is retain? If retain policy in KIP-950 knows > that there is data in remote then this should also be able to figure it out. > The common denominator is that there needs to be no *remote* data at the > point of disabling TS. As such, the most straightforward option is to refuse > to start brokers if there are topics with the {{remote.storage.enabled}} > present. This in essence re
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignment
vamossagar12 commented on code in PR #14000: URL: https://github.com/apache/kafka/pull/14000#discussion_r1279414273 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -161,30 +161,38 @@ public void testAssignmentsWhenWorkersJoinAfterRevocations() { time.sleep(assignor.delay); addNewEmptyWorkers("worker4"); performStandardRebalance(); +assertEquals(0, assignor.delay); // No revocations in this round. assertWorkers("worker1", "worker2", "worker3", "worker4"); assertConnectorAllocations(0, 0, 1, 1); assertTaskAllocations(0, 3, 3, 3); // Fifth assignment and a fifth worker joining after a revoking rebalance. -// We shouldn't revoke and set a delay > initial interval +// We shouldn't revoke and set a delay equal to initial interval addNewEmptyWorkers("worker5"); performStandardRebalance(); -assertTrue(assignor.delay > 40); +assertEquals(40, assignor.delay); // First successive revoking rebalance. Review Comment: IMO this happens because we weren't unsetting the counter when delay goes equal to 0. This is the first successive revoking rebalance and the delay should be 40. I say this because this case is similar to worker2 -> worker3 joining above where the delay was 40. It should be similar here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #14128: KAFKA-15272: Fix the logic which finds candidate log segments to upload it to tiered storage
kamalcph commented on PR #14128: URL: https://github.com/apache/kafka/pull/14128#issuecomment-1658615037 @showuon @clolov @satishd Addressed your review comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
[ https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749248#comment-17749248 ] José Armando García Sancio commented on KAFKA-15264: Thanks [~funkye] for the tests. Can you attach the broker logs during the above tests? Can you also run the same tests but use a replication factor 1? There is a theory that this could be related to an issue that was fixed here: [https://github.com/apache/kafka/pull/13765] or you can run the KRaft tests against `trunk`. > Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter > --- > > Key: KAFKA-15264 > URL: https://issues.apache.org/jira/browse/KAFKA-15264 > Project: Kafka > Issue Type: Bug >Reporter: jianbin.chen >Priority: Major > Attachments: image-2023-07-28-09-51-01-662.png, > image-2023-07-28-09-52-38-941.png, image-2023-07-31-18-04-54-112.png, > image-2023-07-31-18-05-21-772.png > > > I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster > deployment), and when I recently compared and tested, I found that when using > the following stress test command, the throughput gap is obvious > > {code:java} > ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 > --record-size 1024 --throughput -1 --producer-props > bootstrap.servers=xxx: acks=1 > 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg > latency, 588.0 ms max latency. > 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg > latency, 460.0 ms max latency. > 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg > latency, 1120.0 ms max latency. > 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg > latency, 1097.0 ms max latency. > 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg > latency, 610.0 ms max latency. > 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg > latency, 1892.0 ms max latency. > 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg > latency, 3000.0 ms max latency. > 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg > latency, 1781.0 ms max latency. > 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg > latency, 2590.0 ms max latency. > 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg > latency, 1463.0 ms max latency. > 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg > latency, 2362.0 ms max latency. > 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg > latency, 2986.0 ms max latency. > 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg > latency, 2958.0 ms max latency. > 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg > latency, 1959.0 ms max latency. > 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg > latency, 1995.0 ms max latency. > 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg > latency, 1496.0 ms max latency. > 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg > latency, 2446.0 ms max latency. > 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg > latency, 2715.0 ms max latency. > 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg > latency, 2702.0 ms max latency. > 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg > latency, 1846.0 ms max latency. > 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg > latency, 1414.0 ms max latency. > 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg > latency, 1897.0 ms max latency. > 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg > latency, 1601.0 ms max latency. > 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg > latency, 1563.0 ms max latency. > 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg > latency, 1975.0 ms max latency. > 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg > latency, 1753.0 ms max latency. > 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg > latency, 1833.0 ms max latency. > 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg > latency, 1927.0 ms max latency. > 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg > latency, 1685.0 ms max latency. > 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg > latency, 2146.0 ms max latency. > 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg > latency, 2125.0 ms max latency. > 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg > latency, 1502.0 ms max latenc
[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 [WIP] Move ReassignPartitionsCommand to java
nizhikov commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1658638517 Hello @mimaison > If you can think of ways to split it in at least 2 PRs that would be preferable My plan is the following: 1. PR to rewrite AdminUtils - https://github.com/apache/kafka/pull/14096 . It independent from the others and can be reviewed right now. I checked all tests failures and it seems like all of them is just flacky tests from trunk. Can you, please, join the review? 2. PR to create a copy of POJOs and custom exceptions of `ReassignPartitionsCommand`. 3. PR to rewrite command itself and it's tests. To finish all three I need to rewrite `ReassignPartitionsIntegrationTest` which I do right now. I will split PR in three after it. What do you think? Can you start review of #14096 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14118: KAFKA-14875: Implement wakeup
kirktrue commented on code in PR #14118: URL: https://github.com/apache/kafka/pull/14118#discussion_r1279620345 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java: ## @@ -19,25 +19,33 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class OffsetFetchApplicationEvent extends ApplicationEvent { -final public CompletableFuture> future; -public final Set partitions; +private final CompletableFuture> future; +private final Set partitions; public OffsetFetchApplicationEvent(final Set partitions) { super(Type.FETCH_COMMITTED_OFFSET); this.partitions = partitions; this.future = new CompletableFuture<>(); Review Comment: Shouldn't this just extend from `CompletableApplicationEvent` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13247: KAFKA-14595 [WIP] Move ReassignPartitionsCommand to java
mimaison commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1658806935 I'll try to take a look at #14096 this week. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
guozhangwang commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1658843159 > I think the root cause is performReassignments in GeneralAssignmentBuilder is a O(partSize * consumerNum) functions, it is very inefficient. Ack, and with your changes, it's not constant and not linear to either num.partitions or num.consumers right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14118: KAFKA-14875: Implement wakeup
lianetm commented on code in PR #14118: URL: https://github.com/apache/kafka/pull/14118#discussion_r1279692395 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java: ## @@ -19,25 +19,33 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class OffsetFetchApplicationEvent extends ApplicationEvent { -final public CompletableFuture> future; -public final Set partitions; +private final CompletableFuture> future; +private final Set partitions; public OffsetFetchApplicationEvent(final Set partitions) { super(Type.FETCH_COMMITTED_OFFSET); this.partitions = partitions; this.future = new CompletableFuture<>(); Review Comment: Agree, and it does already. It is just that the changes are not here yet (only in integration branch for now) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14118: KAFKA-14875: Implement wakeup
philipnee commented on code in PR #14118: URL: https://github.com/apache/kafka/pull/14118#discussion_r1279701769 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java: ## @@ -19,25 +19,33 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class OffsetFetchApplicationEvent extends ApplicationEvent { -final public CompletableFuture> future; -public final Set partitions; +private final CompletableFuture> future; +private final Set partitions; public OffsetFetchApplicationEvent(final Set partitions) { super(Type.FETCH_COMMITTED_OFFSET); this.partitions = partitions; this.future = new CompletableFuture<>(); Review Comment: CompletableApplicationEvent isn't available at the current trunk yet. I think it happens at a different commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15274) support moving files to be deleted to other directories
[ https://issues.apache.org/jira/browse/KAFKA-15274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749312#comment-17749312 ] Greg Harris commented on KAFKA-15274: - Hi [~funkye] and thank you for the suggested feature! Are you familiar with this KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] ? I believe it also intends to solve the problem you're experiencing. > support moving files to be deleted to other directories > --- > > Key: KAFKA-15274 > URL: https://issues.apache.org/jira/browse/KAFKA-15274 > Project: Kafka > Issue Type: Task >Reporter: jianbin.chen >Assignee: jianbin.chen >Priority: Major > > Hello everyone, I am a Kafka user from China. Our company operates in public > clouds overseas, such as AWS, Ali Cloud, and Huawei Cloud. We face a large > amount of data exchange and business message delivery every day. Daily > messages consume a significant amount of disk space. Purchasing the > corresponding storage capacity on these cloud providers incurs substantial > costs, especially for SSDs with ultra-high IOPS. High IOPS is very effective > for disaster recovery, especially in the event of a sudden broker failure > where storage space becomes full or memory space is exhausted leading to OOM > kills. This high IOPS storage greatly improves data recovery efficiency, > forcing us to adopt smaller storage specifications with high IO to save > costs. Particularly, cloud providers only allow capacity expansion but not > reduction. > Currently, we have come up with a solution and would like to contribute it to > the community for discussion. When we need to delete logs, I can purchase S3 > or Minio storage from services like AWS and mount it to my brokers. When a > log needs to be deleted, we can decide how it leaves the broker. The default > is to delete it directly, while the move option moves it to S3. Since most of > the deleted data is cold data that won't be used in the short term, this > approach improves the retention period of historical data while maintaining > good cost control. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement
[ https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749325#comment-17749325 ] Florin Akermann commented on KAFKA-14748: - Thank you [~guozhang]. Here is the KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams] > Relax non-null FK left-join requirement > --- > > Key: KAFKA-14748 > URL: https://issues.apache.org/jira/browse/KAFKA-14748 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Florin Akermann >Priority: Major > > Kafka Streams enforces a strict non-null-key policy in the DSL across all > key-dependent operations (like aggregations and joins). > This also applies to FK-joins, in particular to the ForeignKeyExtractor. If > it returns `null`, it's treated as invalid. For left-joins, it might make > sense to still accept a `null`, and add the left-hand record with an empty > right-hand-side to the result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins
[ https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17749322#comment-17749322 ] Florin Akermann commented on KAFKA-12317: - [~guozhang] [~mjsax] KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams > Relax non-null key requirement for left/outer KStream joins > --- > > Key: KAFKA-12317 > URL: https://issues.apache.org/jira/browse/KAFKA-12317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Florin Akermann >Priority: Major > > Currently, for a stream-streams and stream-table/globalTable join > KafkaStreams drops all stream records with a `null`-key (`null`-join-key for > stream-globalTable), because for a `null`-(join)key the join is undefined: > ie, we don't have an attribute the do the table lookup (we consider the > stream-record as malformed). Note, that we define the semantics of > _left/outer_ join as: keep the stream record if no matching join record was > found. > We could relax the definition of _left_ stream-table/globalTable and > _left/outer_ stream-stream join though, and not drop `null`-(join)key stream > records, and call the ValueJoiner with a `null` "other-side" value instead: > if the stream record key (or join-key) is `null`, we could treat is as > "failed lookup" instead of treating the stream record as corrupted. > If we make this change, users that want to keep the current behavior, can add > a `filter()` before the join to drop `null`-(join)key records from the stream > explicitly. > Note that this change also requires to change the behavior if we insert a > repartition topic before the join: currently, we drop `null`-key record > before writing into the repartition topic (as we know they would be dropped > later anyway). We need to relax this behavior for a left stream-table and > left/outer stream-stream join. User need to be aware (ie, we might need to > put this into the docs and JavaDocs), that records with `null`-key would be > partitioned randomly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao commented on PR #14111: URL: https://github.com/apache/kafka/pull/14111#issuecomment-1659045347 @AndrewJSchofield : Looking at the code a bit more closely. I am not sure that the exponential backoff logic is added properly for the common failure cases in this PR. On the producer side, this is the loop when there is a leader change. ``` Sender call RecordAccumulator.partitionReady() to check if the batch need to backoff call completeBatch() --> on retriable error, reenqueueBatch() --> on metadata error, trigger updateMetadata() ``` If the metadata propagation is delayed, the Sender will stay in the above loop for multiple iterations. This PR only uses retryBackoffMax when the metadata update fails. However, in the common case, the metadata request won't fail and the issue is that the metadata is stale. So, it seems that we need to change RecordAccumulator.partitionReady() to check retryBackoffMax and implement the exponential backoff logic when the batch is re-enqueued. On the consumer side, there is a similar loop when there is a leader epoch bump. ``` KafkaConsumer.poll() call sendFetches() call pollForFetches() --> call AbstractFetch.handleInitializeCompletedFetchErrors --> requestMetadataUpdate() ``` Again, if the metadata propagation is delayed, the consumer poll() call will stay in the above loop for multiple iterations. This PR only uses retryBackoffMax when the metadata update fails. However, in the common case, the metadata request won't fail and the issue is that the metadata is stale. So, it seems that we need to add the exponential backoff logic to the above loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
philipnee commented on PR #14123: URL: https://github.com/apache/kafka/pull/14123#issuecomment-1659084728 Current failures: ``` New failing - 12 Build / JDK 11 and Scala 2.13 / [1] tlsProtocol=TLSv1.2, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest <1s Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 1m 45s Build / JDK 20 and Scala 2.13 / testNonDefaultConnectionCountLimitAndRateLimit() – kafka.network.ConnectionQuotasTest 10s Build / JDK 20 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 12s Build / JDK 17 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 1m 51s Build / JDK 17 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest 1m 43s Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 2m 6s Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 2m 7s Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 2m 28s Build / JDK 8 and Scala 2.12 / testRackAwareRangeAssignor() – integration.kafka.server.FetchFromFollowerIntegrationTest 41s Build / JDK 8 and Scala 2.12 / testClose() – kafka.log.remote.RemoteIndexCacheTest <1s Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 12s Existing failures - 4 Build / JDK 11 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 1m 48s Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 1m 48s Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 2m 44s Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 2m 18s ``` Non related to the code change. No build failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
lianetm commented on code in PR #14123: URL: https://github.com/apache/kafka/pull/14123#discussion_r1279887573 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java: ## @@ -153,12 +154,32 @@ public void testCommitAsync_UserSuppliedCallback() { @SuppressWarnings("unchecked") public void testCommitted() { Set mockTopicPartitions = mockTopicPartitionOffset().keySet(); -mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> { -when(mock.complete(any())).thenReturn(new HashMap<>()); -}); -consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); -assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1))); - verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); +CompletableFuture> committedFuture = new CompletableFuture<>(); +try (MockedConstruction mockConstruction = + mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> { + when(mock.future()).thenReturn(committedFuture); + })) { +committedFuture.complete(mockTopicPartitionOffset()); +consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); +assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1000))); + verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); +} +} + +@Test +@SuppressWarnings("unchecked") Review Comment: Does not seem needed I would say -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1659177876 @junrao: > @kirktrue : It seems that JDK 20 tests didn't complete because of failures in ':storage:unitTest'. It would be useful to verify if this is an existing issue. Looking at the logs for that test run, there are no individual test failures shown in the logs or in the Jenkins UI for the storage module's unit tests. According to the Gradle Enterprise tool I recently found out about, there were [~200 cases in the last 28 days where test runs failed with that error](https://ge.apache.org/scans/failures?failures.expandedFailures=WzEsMl0&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America/Los_Angeles): ``` Execution failed for task ' '. > Process 'Gradle Test Executor ' finished with non-zero exit value 1 This problem might be caused by incorrect test process configuration. For more on test execution, please refer to https://docs.gradle.org//userguide/java_testing.html#sec:test_execution in the Gradle documentation. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
philipnee commented on code in PR #14123: URL: https://github.com/apache/kafka/pull/14123#discussion_r1279890789 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java: ## @@ -153,12 +154,32 @@ public void testCommitAsync_UserSuppliedCallback() { @SuppressWarnings("unchecked") public void testCommitted() { Set mockTopicPartitions = mockTopicPartitionOffset().keySet(); -mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> { -when(mock.complete(any())).thenReturn(new HashMap<>()); -}); -consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); -assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1))); - verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); +CompletableFuture> committedFuture = new CompletableFuture<>(); +try (MockedConstruction mockConstruction = + mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> { + when(mock.future()).thenReturn(committedFuture); + })) { +committedFuture.complete(mockTopicPartitionOffset()); +consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); +assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1000))); + verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); +} +} + +@Test +@SuppressWarnings("unchecked") Review Comment: you are right! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
junrao commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1659202647 Thanks @kirktrue. @satishd : Are you aware of any transient failures in :storage:unitTest? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1659256150 > @satishd : Are you aware of any transient failures in :storage:unitTest? Thanks. According to [Gradle Enterprise](https://ge.apache.org/scans/failures?failures.failureMessage=Execution%20failed%20for%20task%20%27:storage:unitTest%27.%0A%3E%20Process%20%27Gradle%20Test%20Executor%20*%20finished%20with%20non-zero%20exit%20value%201%0A%20%20This%20problem%20might%20be%20caused%20by%20incorrect%20test%20process%20configuration.%0A%20%20For%20more%20on%20test%20execution%2C%20please%20refer%20to%20https://docs.gradle.org/8.2.1/userguide/java_testing.html%23sec:test_execution%20in%20the%20Gradle%20documentation.&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America/Los_Angeles), it's failed 71 times starting on July 18, 2023. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-14724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14724: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Port tests in FetcherTest to FetchRequestManagerTest > > > Key: KAFKA-14724 > URL: https://issues.apache.org/jira/browse/KAFKA-14724 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task involves copying the relevant tests from {{FetcherTest}} and > modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14758) Extract inner classes from Fetcher for reuse in refactoring
[ https://issues.apache.org/jira/browse/KAFKA-14758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14758: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Extract inner classes from Fetcher for reuse in refactoring > --- > > Key: KAFKA-14758 > URL: https://issues.apache.org/jira/browse/KAFKA-14758 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.5.0 > > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task includes refactoring {{Fetcher}} by extracting out the inner > classes into top-level (though still in {{{}internal{}}}) so that those > classes can be referenced by forthcoming refactored fetch logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager to integrate fetch into new consumer threading refactor
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Introduce FetchRequestManager to integrate fetch into new consumer threading > refactor > - > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task is to introduce a new class named {{FetchRequestManager}} that will > be responsible for: > # Formatting fetch requests to the background thread > # Configuring the callback on fetch responses for the background thread > The response handler will collect the fetch responses from the broker and > create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. > The newly introduced {{FetchUtils}} will be used for both > {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as > possible. > The foreground logic will decompress the data into a {{{}Record{}}}, which > will then be deserialized into a {{ConsumerRecord}} for returning to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15174) Ensure the correct thread is executing the callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15174: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Ensure the correct thread is executing the callbacks > > > Key: KAFKA-15174 > URL: https://issues.apache.org/jira/browse/KAFKA-15174 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > We need to add assertion tests to ensure the correct thread is executing the > offset commit callbacks and rebalance callback -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15184) New consumer internals refactoring and clean up
[ https://issues.apache.org/jira/browse/KAFKA-15184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15184: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > New consumer internals refactoring and clean up > --- > > Key: KAFKA-15184 > URL: https://issues.apache.org/jira/browse/KAFKA-15184 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > Minor refactoring of the new consumer internals including introduction of the > {{RequestManagers}} class to hold references to the {{RequestManager}} > instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14965) Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new consumer threading refactor
[ https://issues.apache.org/jira/browse/KAFKA-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14965: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new > consumer threading refactor > - > > Key: KAFKA-14965 > URL: https://issues.apache.org/jira/browse/KAFKA-14965 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > This task introduces new functionality for handling ListOffsetsRequests for > the new consumer implementation, as part for the ongoing work for the > consumer threading model refactor. > This task introduces a new class named {{ListOffsetsRequestManager, > }}responsible of handling ListOffsets requests performed by the consumer to > expose functionality like beginningOffsets, endOffsets and offsetsForTimes. > The Offset{{{}Fetcher{}}} class is used internally by the {{KafkaConsumer}} > to list offsets, so this task will be based on a refactored > Offset{{{}Fetcher{}}}, reusing the fetching logic as much as possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14252) Create background thread skeleton for new Consumer threading model
[ https://issues.apache.org/jira/browse/KAFKA-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14252: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Create background thread skeleton for new Consumer threading model > -- > > Key: KAFKA-14252 > URL: https://issues.apache.org/jira/browse/KAFKA-14252 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > The event handler internally instantiates a background thread to consume > ApplicationEvents and produce BackgroundEvents. In this ticket, we will > create a skeleton of the background thread. We will incrementally add > implementation in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Extract common logic from Fetcher > - > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.5.0 > > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task includes refactoring {{Fetcher}} by extracting out some common > logic to allow forthcoming implementations to leverage it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14246) Update threading model for Consumer (KIP-945)
[ https://issues.apache.org/jira/browse/KAFKA-14246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14246: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Update threading model for Consumer (KIP-945) > - > > Key: KAFKA-14246 > URL: https://issues.apache.org/jira/browse/KAFKA-14246 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > Hi community, > > We are refactoring the current KafkaConsumer and making it more asynchronous. > This is the master Jira to track the project's progress; subtasks will be > linked to this ticket. Please review the design document and feel free to > use this thread for discussion. > > The design document is here: > [https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor] > > The original email thread is here: > [https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l] > > I will continue to update the 1pager as reviews and comments come. > > Thanks, > P -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14875) Implement Wakeup()
[ https://issues.apache.org/jira/browse/KAFKA-14875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14875: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Implement Wakeup() > -- > > Key: KAFKA-14875 > URL: https://issues.apache.org/jira/browse/KAFKA-14875 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > Implement wakeup() and WakeupException. This would be different to the > current implementation because I think we just need to interrupt the blocking > futures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15163: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Implement validatePositions functionality for new KafkaConsumer > --- > > Key: KAFKA-15163 > URL: https://issues.apache.org/jira/browse/KAFKA-15163 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > Introduce support for validating positions in the new OffsetsRequestManager. > This task will include a new event for the validatePositions calls performed > from the new consumer, and the logic for handling such events in the > OffsetRequestManager. > The validate positions implementation will keep the same behaviour as the one > in the old consumer, but adapted to the new threading model. So it is based > in a VALIDATE_POSITIONS events that is submitted to the background thread, > and the processed by the ApplicationEventProcessor. The processing itself is > done by the OffsetRequestManager given that this will require an > OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such > requests in the OffsetRequestManager, responsible for offset-related requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14848) KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig
[ https://issues.apache.org/jira/browse/KAFKA-14848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14848: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig > > > Key: KAFKA-14848 > URL: https://issues.apache.org/jira/browse/KAFKA-14848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.5.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Original Estimate: 24h > Remaining Estimate: 24h > > [~rayokota] found some {{{}NullPointerException{}}}s that originate because > of a recently introduced error in the {{KafkaConsumer}} constructor. The code > was changed to pass the deserializer variables into the {{FetchConfig}} > constructor. However, this code change incorrectly used the locally-scoped > variables, not the instance-scoped variables. Since the locally-scoped > variables could be {{{}null{}}}, this results in the {{FetchConfig}} storing > {{null}} references, leading to downstream breakage. > Suggested change: > {noformat} > - FetchConfig fetchConfig = new FetchConfig<>(config, keyDeserializer, > valueDeserializer, isolationLevel); > + FetchConfig fetchConfig = new FetchConfig<>(config, > this.keyDeserializer, this.valueDeserializer, isolationLevel); > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14950) Implement assign() and assignment()
[ https://issues.apache.org/jira/browse/KAFKA-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14950: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Implement assign() and assignment() > --- > > Key: KAFKA-14950 > URL: https://issues.apache.org/jira/browse/KAFKA-14950 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.6.0 > > > Implement assign() and assignment() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15180) Generalize integration tests to change use of KafkaConsumer to Consumer
[ https://issues.apache.org/jira/browse/KAFKA-15180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15180: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Generalize integration tests to change use of KafkaConsumer to Consumer > --- > > Key: KAFKA-15180 > URL: https://issues.apache.org/jira/browse/KAFKA-15180 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.6.0 > > > For the consumer threading refactor project, we're introducing a new > implementation of the {{Consumer}} interface. However, most of the instances > in the integration tests specifically use the concrete implementation > {{{}KafkaConsumer{}}}. This task is to generalize those uses where possible > to use the {{Consumer}} interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14675) Extract metadata-related tasks from Fetcher into MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14675: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Extract metadata-related tasks from Fetcher into MetadataFetcher > > > Key: KAFKA-14675 > URL: https://issues.apache.org/jira/browse/KAFKA-14675 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Affects Versions: 3.5.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task covers the work to extract from {{Fetcher}} the APIs that are > related to metadata operations into a new class named > {{{}MetadataFetcher{}}}. This will allow the refactoring of {{Fetcher}} and > {{MetadataFetcher}} for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14960) Metadata Request Manager and listTopics/partitionsFor API
[ https://issues.apache.org/jira/browse/KAFKA-14960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14960: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Metadata Request Manager and listTopics/partitionsFor API > - > > Key: KAFKA-14960 > URL: https://issues.apache.org/jira/browse/KAFKA-14960 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > Implement listTopics and partitionsFor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14937) Refactoring for client code to reduce boilerplate
[ https://issues.apache.org/jira/browse/KAFKA-14937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14937: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Refactoring for client code to reduce boilerplate > - > > Key: KAFKA-14937 > URL: https://issues.apache.org/jira/browse/KAFKA-14937 > Project: Kafka > Issue Type: Improvement > Components: admin, clients, consumer, producer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.6.0 > > > There are a number of places in the client code where the same basic calls > are made by more than one client implementation. Minor refactoring will > reduce the amount of boilerplate code necessary for the client to construct > its internal state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15188) Implement more of the remaining PrototypeAsyncConsumer APIs
[ https://issues.apache.org/jira/browse/KAFKA-15188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15188: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Implement more of the remaining PrototypeAsyncConsumer APIs > --- > > Key: KAFKA-15188 > URL: https://issues.apache.org/jira/browse/KAFKA-15188 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > There are several {{Consumer}} APIs that only touch the {{ConsumerMetadata}} > and/or {{SubscriptionState}} classes; they do not perform network I/O or > otherwise block. These can be implemented without needing {{RequestManager}} > updates and include the following APIs: > - {{committed}} > - {{currentLag}} > - {{listTopics}} > - {{metrics}} > - {{partitionsFor}} > - {{pause}} > - {{paused}} > - {{position}} > - {{resume}} > - {{seek}} > - {{seekToBeginning}} > - {{seekToEnd}} > - {{subscribe}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14468) Refactor Commit Logic
[ https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14468: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Refactor Commit Logic > - > > Key: KAFKA-14468 > URL: https://issues.apache.org/jira/browse/KAFKA-14468 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.5.0 > > > Refactor commit logic using the new multi-threaded coordinator construct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15081) Implement new consumer offsetsForTimes
[ https://issues.apache.org/jira/browse/KAFKA-15081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15081: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Implement new consumer offsetsForTimes > -- > > Key: KAFKA-15081 > URL: https://issues.apache.org/jira/browse/KAFKA-15081 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > Implement offsetForTimes for the kafka consumer based on the new threading > model, using the ListOffsetsRequestManager. No changes at the consumer API > level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in ListOffsetRequestManager
[ https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15115: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Implement resetPositions functionality in ListOffsetRequestManager > -- > > Key: KAFKA-15115 > URL: https://issues.apache.org/jira/browse/KAFKA-15115 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > Introduce support for resetting positions in the new > ListOffsetsRequestManager. This task will include a new event for the > resetPositions calls performed from the new consumer, and the logic for > handling such events in the ListOffsetRequestManager. > The reset positions implementation will keep the same behaviour as the one in > the old consumer, but adapted to the new threading model. So it is based in a > RESET_POSITIONS events that is submitted to the background thread, and the > processed by the ApplicationEventProcessor. The processing itself is done by > the ListOffsetRequestManager given that this will require a LIST_OFFSETS > request for the partitions awaiting reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded
[ https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15173: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > ApplicationEventQueue and BackgroundEventQueue should be bounded > > > Key: KAFKA-15173 > URL: https://issues.apache.org/jira/browse/KAFKA-15173 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > The async consumer uses ApplicationEventQueue and BackgroundEventQueue to > facilitate message passing between the application thread and the background > thread. The current implementation is boundless, which can potentially cause > OOM and other performance-related issues. > I think the queues need a finite bound, and we need to decide how to handle > the situation when the bound is reached. In particular, I would like to > answer these questions: > > # What should the upper limit be for both queues: Can this be a > configurable, memory-based bound? Or just an arbitrary number of events as > the bound. > # What should happen when the application event queue is filled up? It > seems like we should introduce a new exception type and notify the user that > the consumer is full. > # What should happen when the background event queue is filled up? This > seems less likely to happen, but I imagine it could happen when the user > stops polling the consumer, causing the queue to be filled. > # Is it necessary to introduce a public configuration for the queue? I think > initially we would select an arbitrary constant number and see the community > feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14761) Integration Tests for the New Consumer Implementation
[ https://issues.apache.org/jira/browse/KAFKA-14761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14761: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Integration Tests for the New Consumer Implementation > - > > Key: KAFKA-14761 > URL: https://issues.apache.org/jira/browse/KAFKA-14761 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > This Jira tracks the efforts of integratoin testing for the new consumer we > are implementing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14264) Refactor coordinator code
[ https://issues.apache.org/jira/browse/KAFKA-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14264: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Refactor coordinator code > - > > Key: KAFKA-14264 > URL: https://issues.apache.org/jira/browse/KAFKA-14264 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.4.0 > > > To refactor the consumer, we changed how the coordinator is called. However, > there will be a time period where the old and new implementation need to > coexist, so we will need to override some of the methods and create a new > implementation of the coordinator. In particular: > # ensureCoordinatorReady needs to be non-blocking or we could just use the > sendFindCoordinatorRequest. > # joinGroupIfNeeded needs to be broken up into more find grain stages for > the new implementation to work. > We also need to create the coordinator state machine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14247) Implement EventHandler interface and DefaultEventHandler for Consumer
[ https://issues.apache.org/jira/browse/KAFKA-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14247: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Implement EventHandler interface and DefaultEventHandler for Consumer > - > > Key: KAFKA-14247 > URL: https://issues.apache.org/jira/browse/KAFKA-14247 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > The background thread runs inside of the DefaultEventHandler to consume > events from the ApplicationEventQueue and produce events to the > BackgroundEventQueue. > The background thread runnable consist of a loop that tries to poll events > from the ApplicationQueue, processes the event if there are any, and poll > networkClient. > In this implementation, the DefaultEventHandler spawns a thread that runs the > BackgroundThreadRunnable. The runnable, as of the current PR, does the > following things: > # Initialize the networkClient > # Poll ApplicationEvent from the queue if there's any > # process the event > # poll the networkClient > PR: https://github.com/apache/kafka/pull/12672 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15164) Extract reusable logic from OffsetsForLeaderEpochClient
[ https://issues.apache.org/jira/browse/KAFKA-15164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15164: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Extract reusable logic from OffsetsForLeaderEpochClient > --- > > Key: KAFKA-15164 > URL: https://issues.apache.org/jira/browse/KAFKA-15164 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-945 > > The OffsetsForLeaderEpochClient class is used for making asynchronous > requests to the OffsetsForLeaderEpoch API. It encapsulates the logic for: > * preparing the requests > * sending them over the network using the network client > * handling the response > The new KafkaConsumer implementation, based on a new threading model, > requires the same logic for preparing the requests and handling the > responses, with different behaviour for how the request is actually sent. > This task includes refactoring OffsetsForLeaderEpochClient by extracting out > the logic for preparing the requests and handling the responses. No changes > in the existing logic, just making the functionality available to be reused. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14966) Extract reusable logic from OffsetFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14966: -- Labels: consumer-threading-refactor kip-945 (was: kip-945) > Extract reusable logic from OffsetFetcher > - > > Key: KAFKA-14966 > URL: https://issues.apache.org/jira/browse/KAFKA-14966 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-945 > Fix For: 3.6.0 > > > The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, > validate and reset positions. > For the new consumer based on a refactored threading model, similar > functionality will be needed by the ListOffsetsRequestManager component. > This task aims at identifying and extracting the OffsetFetcher functionality > that can be reused by the new consumer implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15184) New consumer internals refactoring and clean up
[ https://issues.apache.org/jira/browse/KAFKA-15184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15184: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-945) > New consumer internals refactoring and clean up > --- > > Key: KAFKA-15184 > URL: https://issues.apache.org/jira/browse/KAFKA-15184 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > > Minor refactoring of the new consumer internals including introduction of the > {{RequestManagers}} class to hold references to the {{RequestManager}} > instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15163: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-945) > Implement validatePositions functionality for new KafkaConsumer > --- > > Key: KAFKA-15163 > URL: https://issues.apache.org/jira/browse/KAFKA-15163 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > Introduce support for validating positions in the new OffsetsRequestManager. > This task will include a new event for the validatePositions calls performed > from the new consumer, and the logic for handling such events in the > OffsetRequestManager. > The validate positions implementation will keep the same behaviour as the one > in the old consumer, but adapted to the new threading model. So it is based > in a VALIDATE_POSITIONS events that is submitted to the background thread, > and the processed by the ApplicationEventProcessor. The processing itself is > done by the OffsetRequestManager given that this will require an > OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such > requests in the OffsetRequestManager, responsible for offset-related requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14875) Implement Wakeup()
[ https://issues.apache.org/jira/browse/KAFKA-14875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14875: -- Labels: consumer-threading-refactor (was: consumer-threading-refactor kip-945) > Implement Wakeup() > -- > > Key: KAFKA-14875 > URL: https://issues.apache.org/jira/browse/KAFKA-14875 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > Implement wakeup() and WakeupException. This would be different to the > current implementation because I think we just need to interrupt the blocking > futures. -- This message was sent by Atlassian Jira (v8.20.10#820010)