Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
brandboat commented on code in PR #15659: URL: https://github.com/apache/kafka/pull/15659#discussion_r1554537353 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -300,6 +300,12 @@ public void testGetReplicaAssignments() throws Exception { assertEquals(assignments, getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0); + +assignments.clear(); + +UnknownTopicOrPartitionException exception = assertThrows(UnknownTopicOrPartitionException.class, +() -> getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10); +assertEquals("Unable to find partition: foo-10", exception.getMessage()); Review Comment: Oops, my bad... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
showuon commented on PR #15663: URL: https://github.com/apache/kafka/pull/15663#issuecomment-2040993296 LGTM! Thanks @FrankYang0529 ! Really nice catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
showuon commented on code in PR #15659: URL: https://github.com/apache/kafka/pull/15659#discussion_r1554536274 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -300,6 +300,12 @@ public void testGetReplicaAssignments() throws Exception { assertEquals(assignments, getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0); + +assignments.clear(); + +UnknownTopicOrPartitionException exception = assertThrows(UnknownTopicOrPartitionException.class, +() -> getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10); +assertEquals("Unable to find partition: foo-10", exception.getMessage()); Review Comment: If we change it to ExecutionException, should we change this? Ex: ``` Exception exception = assertThrows(ExecutionException.class, () -> getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10); Exception causeException = exception.getCause(); AssertTrue(causeException instanceof UnknownTopicOrPartitionException); assertEquals("Unable to find partition: foo-10", causeException.getMessage()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16234. --- Resolution: Fixed > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Critical > Fix For: 3.8.0, 3.7.1 > > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for {{Partition::topicId}} relies on retrieving the topicId from > {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to > {{None}} when a partition is marked offline and the key for the partition is > removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. > Therefore, topicId for a partitioned marked offline always returns {{None}} > and new logs for all partitions in a failed log directory are always created > on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with KAFKA-16212 but in the short-term, an immediate > solution can be to have {{Partition}} object accept {{Option[TopicId]}} in > it's constructor and have it fallback to {{log}} or {{logManager}} if it's > unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
showuon merged PR #15335: URL: https://github.com/apache/kafka/pull/15335 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
showuon commented on PR #15335: URL: https://github.com/apache/kafka/pull/15335#issuecomment-2040991135 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup in MetadataShell [kafka]
wernerdv commented on PR #15672: URL: https://github.com/apache/kafka/pull/15672#issuecomment-2040983038 @cmccabe @chia7712 Hello, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov opened a new pull request, #15673: URL: https://github.com/apache/kafka/pull/15673 ### Summary This is a follow-up of https://github.com/apache/kafka/pull/15213 where I missed updating the API version and the IBP version. I chose to mark the latest version of the ListOffsets API as unstable until the whole [KIP-1005](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1005%3A+Expose+EarliestLocalOffset+and+TieredOffset) is implemented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16480) ListOffsets change should have an associated API/IBP version update
Christo Lolov created KAFKA-16480: - Summary: ListOffsets change should have an associated API/IBP version update Key: KAFKA-16480 URL: https://issues.apache.org/jira/browse/KAFKA-16480 Project: Kafka Issue Type: Sub-task Reporter: Christo Lolov Assignee: Christo Lolov https://issues.apache.org/jira/browse/KAFKA-16154 introduced the changes to the ListOffsets API to accept latest-tiered-timestamp and return the corresponding offset. Those changes should have a) increased the version of the ListOffsets API b) increased the inter-broker protocol version c) hidden the latest version of the ListOffsets behind the latestVersionUnstable flag The purpose of this task is to remedy said miss -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Improvements to release.py [kafka]
omkreddy commented on code in PR #15651: URL: https://github.com/apache/kafka/pull/15651#discussion_r1554531283 ## release.py: ## @@ -348,6 +348,9 @@ def command_release_announcement_email(): +An overview of the release and its notable changes can be found in the +release blog post: Review Comment: Thanks for the review,. updated the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Cleanup in MetadataShell [kafka]
wernerdv opened a new pull request, #15672: URL: https://github.com/apache/kafka/pull/15672 Removed unnecessary exceptions from the throw list. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat commented on PR #15668: URL: https://github.com/apache/kafka/pull/15668#issuecomment-2040977883 > @brandboat nice finding! Should we add the thread prefix to https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2487 to avoid similar issue in the future? Oh wow ! I didn't notice that we have this check before. Sure ! Already added the thread name in this check in commit https://github.com/apache/kafka/pull/15668/commits/8daf268f2c99b7a88dde150967c22301c88cc1d6. Many thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
brandboat commented on code in PR #15659: URL: https://github.com/apache/kafka/pull/15659#discussion_r1554519247 ## tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java: ## @@ -654,6 +654,13 @@ static Map> getReplicaAssignmentForPartitions(Admi res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList())); }) ); + +if (!res.keySet().equals(partitions)) { +Set missingPartitions = new HashSet<>(partitions); +missingPartitions.removeAll(res.keySet()); +throw new UnknownTopicOrPartitionException("Unable to find partition: " + + missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Review Comment: Though I don't know why do we do this, but follow the pattern is a good choice. Thanks for the comment :smiley: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
chia7712 commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1554518847 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: not sure whether we need another class to collect those ssl/sasl-related configs. If we do need it, could we rename it to `KafkaSecurityConfig` to avoid adding other security-unrelated configs in the future. Personally, a fat class like `KafkaConfig.scala` is a anti-pattern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16047: Leverage the fenceProducers timeout in the InitProducerId [kafka]
github-actions[bot] commented on PR #15078: URL: https://github.com/apache/kafka/pull/15078#issuecomment-2040935039 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Default test name added to tools [kafka]
chia7712 commented on PR #15666: URL: https://github.com/apache/kafka/pull/15666#issuecomment-2040912811 @nizhikov Could you please rebase code to see the display name of tools test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16472) Integration tests in Java don't really run kraft case
[ https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16472. Resolution: Fixed > Integration tests in Java don't really run kraft case > - > > Key: KAFKA-16472 > URL: https://issues.apache.org/jira/browse/KAFKA-16472 > Project: Kafka > Issue Type: Test >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > Following test cases don't really run kraft case. The reason is that the test > info doesn't contain parameter name, so it always returns false in > TestInfoUtils#isKRaft. > * TopicCommandIntegrationTest > * DeleteConsumerGroupsTest > * AuthorizerIntegrationTest > * DeleteOffsetsConsumerGroupCommandIntegrationTest > > We can add `options.compilerArgs += '-parameters'` after > [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273] > to fix it. > > Also, we have to add `String quorum` to cases in > DeleteOffsetsConsumerGroupCommandIntegrationTest. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16472) Integration tests in Java don't really run kraft case
[ https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16472: --- Fix Version/s: 3.8.0 3.7.1 > Integration tests in Java don't really run kraft case > - > > Key: KAFKA-16472 > URL: https://issues.apache.org/jira/browse/KAFKA-16472 > Project: Kafka > Issue Type: Test >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > Following test cases don't really run kraft case. The reason is that the test > info doesn't contain parameter name, so it always returns false in > TestInfoUtils#isKRaft. > * TopicCommandIntegrationTest > * DeleteConsumerGroupsTest > * AuthorizerIntegrationTest > * DeleteOffsetsConsumerGroupCommandIntegrationTest > > We can add `options.compilerArgs += '-parameters'` after > [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273] > to fix it. > > Also, we have to add `String quorum` to cases in > DeleteOffsetsConsumerGroupCommandIntegrationTest. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
chia7712 merged PR #15663: URL: https://github.com/apache/kafka/pull/15663 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
chia7712 commented on PR #15663: URL: https://github.com/apache/kafka/pull/15663#issuecomment-2040889410 I'm going to merge this PR in order to make other tool tests can run with KRaft. We can address all late reviews in other PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1554474003 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1554466415 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
showuon commented on code in PR #15659: URL: https://github.com/apache/kafka/pull/15659#discussion_r1554446742 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -300,6 +300,12 @@ public void testGetReplicaAssignments() throws Exception { assertEquals(assignments, getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0); + +assignments.clear(); + +UnknownTopicOrPartitionException exception = assertThrows(UnknownTopicOrPartitionException.class, +() -> getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10); Review Comment: In `ReassignPartitionsCommand#describeTopics`, we wrap `UnknownTopicOrPartitionException` with `ExecutionException`. Maybe we should make it consistent by this pattern? ## tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java: ## @@ -654,6 +654,13 @@ static Map> getReplicaAssignmentForPartitions(Admi res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList())); }) ); + +if (!res.keySet().equals(partitions)) { +Set missingPartitions = new HashSet<>(partitions); +missingPartitions.removeAll(res.keySet()); +throw new UnknownTopicOrPartitionException("Unable to find partition: " + + missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Review Comment: In ReassignPartitionsCommand#describeTopics, we wrap UnknownTopicOrPartitionException with ExecutionException. Maybe we should make it consistent by this pattern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
showuon commented on code in PR #15659: URL: https://github.com/apache/kafka/pull/15659#discussion_r1554446742 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -300,6 +300,12 @@ public void testGetReplicaAssignments() throws Exception { assertEquals(assignments, getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0); + +assignments.clear(); + +UnknownTopicOrPartitionException exception = assertThrows(UnknownTopicOrPartitionException.class, +() -> getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10); Review Comment: In `ReassignPartitionsCommand#describeTopics`, we wrap `UnknownTopicOrPartitionException` with `ExecutionException`. Maybe we should make it consistent by this pattern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16474: -- Labels: kip-848-client-support (was: ) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16474: -- Component/s: clients > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16474: -- Fix Version/s: 3.8.0 > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834456#comment-17834456 ] Matthias J. Sax commented on KAFKA-16478: - \cc [~mimaison] – seems you remove some older release from the download path (3.4.1, 3.5.0, 3.5.1, and 3.6.0). Let's check all links for these releases to point to "archive". > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478 ] Matthias J. Sax deleted comment on KAFKA-16478: - was (Author: mjsax): \cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move artifacts and forgot to update the links? > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834452#comment-17834452 ] Matthias J. Sax commented on KAFKA-16478: - \cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move artifacts and forgot to update the links? > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834451#comment-17834451 ] Matthias J. Sax commented on KAFKA-16478: - [~der_eismann] – Thanks for reporting this. Do you want to do a PR to fix it? Should be simple? Just need a small update to [https://github.com/apache/kafka-site/blob/asf-site/downloads.html] > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fix javadoc warnings [kafka]
gharris1727 commented on PR #15527: URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040626161 > I don't think that's a good idea. Usually IDEs help in figuring out the usages of fields/classes anyway. Should we remove that tag? I agree, let's remove that. RemoteStorageMetrics is already referenced by MetricsTest, so this back-link isn't adding anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix javadoc warnings [kafka]
gaurav-narula commented on PR #15527: URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040616724 @gharris1727 Thanks for pointing that out. The warning below ``` kafka/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java:31: warning: reference not found: kafka.api.MetricsTest * @see kafka.api.MetricsTest ``` occurs because we're trying to refer to a class in the test compile path from production code. I don't think that's a good idea. Usually IDEs help in figuring out the usages of fields/classes anyway. Should we remove that tag? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1554282432 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -1136,18 +1137,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging { def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (producerStateManagerConfig.producerIdExpirationMs != newConfig.producerIdExpirationMs) { - info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}") + info(s"Reconfigure ${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}") producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) } if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionPartitionVerificationEnable) { - info(s"Reconfigure ${KafkaConfig.TransactionPartitionVerificationEnableProp} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}") + info(s"Reconfigure ${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}") producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable) } } def validateReconfiguration(newConfig: KafkaConfig): Unit = { if (newConfig.producerIdExpirationMs < 0) - throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}") + throw new ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}") Review Comment: maybe `ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS` is more suitable in this case because the code is used for logging `Producer` state. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: Should we do similar refactor for `Defaults`? -- This is an automated message from the Apache Git Service. To respond to the messa
Re: [PR] MINOR: fix javadoc warnings [kafka]
gharris1727 commented on PR #15527: URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040586001 Hi @gaurav-narula I see some other javadoc warnings, do you think we should address these? ``` > Task :storage:storage-api:javadoc kafka/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java:31: warning: reference not found: kafka.api.MetricsTest * @see kafka.api.MetricsTest ^ 1 warning > Task :streams:javadoc kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore>}. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio opened a new pull request, #15671: URL: https://github.com/apache/kafka/pull/15671 DRAFT ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2040495611 Thanks for the changes @lucasbru, looks good to me overall. This is tidying up the whole async commit callbacks execution story. Left some comments, mostly minor, and to make sure we're on the same page with the reasoning behind the change. Should we update the PR description to refer not only to the `consumer.commitSync()`, but also `consumer.close()`, both being fixed here to ensure that previous async commit callbacks are always executed? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
gharris1727 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1554188423 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -69,6 +69,9 @@ public static void setup() { Map workerProps = new HashMap<>(); workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); +// Work around a circular-dependency in TestPlugins. +TestPlugins.pluginPath(); Review Comment: The circular dependency is still a problem that I haven't resolved, sorry. It's fixed in this un-merged PR: #1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs an actual commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. Agree? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554177925 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) Review Comment: I would say we don't need this, because of the successful `assertEquals` with call to `committed` above, ln 694. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554165141 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: nit: maybe better name testCommitAsyncCompleted**Before**ConsumerCloses (clearer and consistent with the similar one below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs an actual commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs a commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improvements to release.py [kafka]
chia7712 commented on code in PR #15651: URL: https://github.com/apache/kafka/pull/15651#discussion_r1554150631 ## release.py: ## @@ -348,6 +348,9 @@ def command_release_announcement_email(): +An overview of the release and its notable changes can be found in the +release blog post: Review Comment: Can we generate the link automatically if the link formate is like `https://kafka.apache.org/blog#apache_kafka_362_release_announcement` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
chia7712 commented on PR #15663: URL: https://github.com/apache/kafka/pull/15663#issuecomment-2040402625 @ijuma Could you please take a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13501: Avoid state restore via rebalance if standbys are enabled [kafka]
vamossagar12 closed pull request #11592: KAFKA-13501: Avoid state restore via rebalance if standbys are enabled URL: https://github.com/apache/kafka/pull/11592 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10526: leader fsync deferral on write [kafka]
vamossagar12 closed pull request #10278: KAFKA-10526: leader fsync deferral on write URL: https://github.com/apache/kafka/pull/10278 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown [kafka]
vamossagar12 closed pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown URL: https://github.com/apache/kafka/pull/10468 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM opened a new pull request, #15670: URL: https://github.com/apache/kafka/pull/15670 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12848: kafka streams jmh benchmarks [kafka]
vamossagar12 closed pull request #10842: KAFKA-12848: kafka streams jmh benchmarks URL: https://github.com/apache/kafka/pull/10842 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1554016667 ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java: ## @@ -33,28 +34,36 @@ public class TimestampedKeyAndJoinSide { private final K key; private final long timestamp; -private final boolean leftSide; +private final JoinSide joinSide; -private TimestampedKeyAndJoinSide(final boolean leftSide, final K key, final long timestamp) { +private TimestampedKeyAndJoinSide(final JoinSide joinSide, final K key, final long timestamp) { this.key = Objects.requireNonNull(key, "key cannot be null"); -this.leftSide = leftSide; +this.joinSide = joinSide; this.timestamp = timestamp; } /** - * Create a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null}. + * Create a new {@link TimestampedKeyAndJoinSide} instance if the provided {@code key} is not {@code null}. * - * @param leftSide True if the key is part of the left join side; False if it is from the right join side + * @param joinSide Whether the key is part of the {@link JoinSide#LEFT} side; or it is from the {@link JoinSide#RIGHT} side * @param key the key * @param the type of the key - * @return a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null} + * @return a new {@link TimestampedKeyAndJoinSide} instance if the provided {@code key} is not {@code null} */ -public static TimestampedKeyAndJoinSide make(final boolean leftSide, final K key, final long timestamp) { -return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp); +public static TimestampedKeyAndJoinSide make(final JoinSide joinSide, final K key, final long timestamp) { Review Comment: Since this is only used in tests now, I think you can remove this and replace the test call-sites with the new functions. Make sure to copy the javadoc to the new signatures too. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.util.Optional; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamLeftJoin extends KStreamKStreamJoin { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); + +KStreamKStreamLeftJoin(final String otherWindowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joiner, +final boolean outer, +final Optional outerJoinWindowName, +final TimeTrackerSupplier sharedTimeTrackerSupplier) { +super(otherWindowName, windows, windows.beforeMs, windows.afterMs, joiner, outerJoinWindowName, +sharedTimeTrackerSupplier, outer); +} + +@Override +public Processor get() { +return new KStreamKStreamLeftJoinProcessor(); +} + +private class KStreamKStreamLeftJoinProcessor extends KStreamKStreamJoinProcessor { +@Override +public void process(final Record leftRecord) { +final long inputRecordTimestamp = leftRecord.timestamp(); +final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); +final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +sh
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on PR #15661: URL: https://github.com/apache/kafka/pull/15661#issuecomment-2040290159 @cadonna @lucasbru - Is it possible for me to ask for a review on this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554025678 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) + +// Try with coordinator known +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) +consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) +assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(2, cb.successCount); + +// Try with empty sync commit +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(3, cb.successCount); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024796 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) + +// Try with coordinator known +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) +consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) +assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(2, cb.successCount); Review Comment: nit: semi-colon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024079 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); Review Comment: nit: unneeded semi-colon. Java to scala jump tricking us...been there :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554020736 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstance @AfterEach public void teardown() { this.metrics.close(); -this.coordinator.close(time.timer(0)); +try { +this.coordinator.close(time.timer(0)); Review Comment: I see, I would say it's fine to throw the error at the coordinator level (and live with code like this). And actually, the need for this catch is not introduced by this PR as I see it. The coordinator close before this PR could throw fenced exception for async commits that were waiting for coord and completed [here](https://github.com/apache/kafka/blob/fd9c7d2932dee055289b403e37a0bbb631c080a9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L983) getting fenced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554011665 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: We have a new `PlainTextConsumerCommitTest` for all commit-relates tests. These 2 should go there I would say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040270805 > Regarding the previous failed tests, one possibility is that the data on the server passed the retention time and is garbage collected. The default retention time is 7 days, which should be long enough. However, since we reuse mockTime, if the test runs long, the retention time might still be reached. Perhaps we could set [log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms) to -1? ``` org.opentest4j.AssertionFailedError: expected: <0> but was: <3> ``` You really hit the bullseye. I can reproduce the error by doing a little sleep before fetching data. Will set `retention.ms` to -1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response within the above commit sync time, then getting it while the super.close waits, so we should trigger the callbacks at this point. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139). Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040231361 @chia7712 : Thanks for the updated PR. Regarding the previous failed tests, one possibility is that the data on the server passed the retention time and is garbage collected. The default retention time is 7 days, which should be long enough. However, since we reuse mockTime, if the test runs long, the retention time might still be reached. Perhaps we could set [log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms) to -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on PR #15470: URL: https://github.com/apache/kafka/pull/15470#issuecomment-2040217099 @mumrah Thanks for the review. Ticket filed. https://issues.apache.org/jira/browse/KAFKA-15579 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16479) Add pagination supported describeTopic interface
Calvin Liu created KAFKA-16479: -- Summary: Add pagination supported describeTopic interface Key: KAFKA-16479 URL: https://issues.apache.org/jira/browse/KAFKA-16479 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu During the DescribeTopicPartitions API implementations, we found it awkward to place the pagination logic within the current admin client describe topic interface. So, in order to change the interface, we may need to have a boarder discussion like creating a KIP. Or even a step forward, to discuss a general client side pagination framework. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15583) High watermark can only advance if ISR size is larger than min ISR
[ https://issues.apache.org/jira/browse/KAFKA-15583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15583. Resolution: Fixed > High watermark can only advance if ISR size is larger than min ISR > -- > > Key: KAFKA-15583 > URL: https://issues.apache.org/jira/browse/KAFKA-15583 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > This is the new high watermark advancement requirement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
junrao commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1553959430 ## clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java: ## @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest { */ public static final long EARLIEST_LOCAL_TIMESTAMP = -4L; +public static final long LATEST_TIERED_TIMESTAMP = -5L; Review Comment: Thanks for following up, @clolov ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). Am I getting the reasoning for the change right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it, right? This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +awaitPendingAsyncCommits(requestTimer, false); Review Comment: nit: maybe helpful to reflect in the name that this does execute the callbacks (or leave it as it is and then have the one line 1406 that does execute the callbacks here, right after) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +awaitPendingAsyncCommits(requestTimer, false); Review Comment: nit: maybe helpful to reflect in the name that this does execute the callbacks (or leave it as it is and then have the one line that does execute the callbacks here, right after) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm commented on PR #15669: URL: https://github.com/apache/kafka/pull/15669#issuecomment-2039970916 Hey @lucasbru, could you take a look at this when you have a chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm opened a new pull request, #15669: URL: https://github.com/apache/kafka/pull/15669 Minor changes for improving the logging and docs related to the auto-commit inflight logic, also adding tests to ensure the expected behaviour: - auto-commit on the interval does not send a request if another one inflight, and it sends the next as soon as a response is received (without waiting for the full interval again) - auto-commit before revocation always send a request (even if another one from auto-commit on interval is in-flight), to ensure the latest is committed before revoking partitions. No changes in logic, just adding tests, docs and minor refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]
divijvaidya closed pull request #15099: MINOR: Increase parallelism for Jenkins URL: https://github.com/apache/kafka/pull/15099 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add 3.6.2 to system tests [kafka]
omkreddy merged PR #15665: URL: https://github.com/apache/kafka/pull/15665 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add 3.6.2 to system tests [kafka]
omkreddy commented on PR #15665: URL: https://github.com/apache/kafka/pull/15665#issuecomment-2039898497 Thanks for the Review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
viktorsomogyi commented on PR #15605: URL: https://github.com/apache/kafka/pull/15605#issuecomment-2039754554 Aslo, thank you @akatona84 for the contribution, @soarez and @urbandan for the reviews. Fixing flaky tests is always very welcomed, keep it up! 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
FrankYang0529 commented on PR #15663: URL: https://github.com/apache/kafka/pull/15663#issuecomment-2039753742 > > I think the class file size increasing is indeed a direct drawback after adding -parameter option because we'll include all the parameters into .class files. I'd like to know if there's any other way to fix this? Could we use ARGUMENTS instead of ARGUMENTS_WITH_NAMES? > > Or we can add the new arg to compileTestJava only to avoid impacting production binary Updated it as `compileTestJava.options.compilerArgs.add "-parameters"`. Thanks for the suggestion @chia7712 @showuon . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
viktorsomogyi merged PR #15605: URL: https://github.com/apache/kafka/pull/15605 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
Philipp Trulson created KAFKA-16478: --- Summary: Links for Kafka 3.5.2 release are broken Key: KAFKA-16478 URL: https://issues.apache.org/jira/browse/KAFKA-16478 Project: Kafka Issue Type: Bug Components: website Affects Versions: 3.5.2 Reporter: Philipp Trulson While trying to update our setup, I noticed that the download links for the 3.5.2 links are broken. They all point to a different host and also contain an additional `/kafka` in their URL. Compare: not working: [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] working: [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1553514811 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && +!classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && Review Comment: I think that we have `usesConsumerGroupProtocol()` in the `ClassicGroup` class. Could we use it? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { Review Comment: Does it have to be public? Should we add some javadoc? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && +!classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && +classicGroup.size() <= consumerGroupMaxSize; +} + +ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List records) { +classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); +classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); + +createGroupTombstoneRecords(classicGroup, records); +ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, classicGroup.groupId(), metrics); +classicGroup.convertToConsumerGroup(consumerGroup, records, metadataImage.topics()); Review Comment: I was wondering whether it would make more sense the other way around and have something like `ConsumerGroup.fromClassicGroup()`. I guess that it does not really matter in the end. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && +!classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && +classicGroup.size() <= consumerGroupMaxSize; +} Review Comment: I wonder whether we should log something (with the reason) when the upgrade is disallowed. Have you considered it? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ## @@ -1300,6 +1341,68 @@ public Map groupAssignment() { )); } +/** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ +public void convertToConsumerGroup( +ConsumerGroup consumerGroup, +List records, +TopicsImage topicsImage +) throws GroupIdNotFoundException { +consumerGroup.setGroupEpoch(generationId); +consumerGroup.setTargetAssignmentEpoch(generationId); + +records.add(RecordHelpers.newGroupEpochRecord(groupId(), generationId)); +// SubscriptionMetadata will be computed in the following consumerGroupHeartbeat + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); +records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), generationId)); + +members.forEach((memberId, member) -> { +ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); +Map> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); +ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata())); Review Comment: * `deserializeAssignment` and `deserializeSubscription` could throw an `SchemaException` if not mistaken if the bytes are incorrect. We should handle those, I suppose. * We also discussed offline the need to k
Re: [PR] KAFKA-16383: fix flaky IdentityReplicationIntegrationTest .testReplicateFromLatest [kafka]
johnnychhsu commented on PR #15556: URL: https://github.com/apache/kafka/pull/15556#issuecomment-2039670307 @vamossagar12 thanks for the comment. sure! let's wait and monitor more builds -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,20 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +if (containsClassicGroup(groupId)) { +return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); Review Comment: I am not a fan of this pattern because you effectively have to look up the group twice. One option would be to use a try..catch to catch the exception thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the lookup, 2) verify non-null and group type and return if it fails. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,20 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +if (containsClassicGroup(groupId)) { +return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); +} else { +log.info("Group {} is null or not a classic group, skipping rebalance stage.", groupId); Review Comment: I wonder if we could use `debug` here. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2805,31 +2826,36 @@ private CoordinatorResult maybeCompleteJoinElseSchedule( * Try to complete the join phase of the initial rebalance. * Otherwise, extend the rebalance. * - * @param group The group under initial rebalance. + * @param groupId The group under initial rebalance. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult tryCompleteInitialRebalanceElseSchedule( -ClassicGroup group, +String groupId, int delayMs, int remainingMs ) { -if (group.newMemberAdded() && remainingMs != 0) { -// A new member was added. Extend the delay. -group.setNewMemberAdded(false); -int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, remainingMs); -int newRemainingMs = Math.max(remainingMs - delayMs, 0); - -timer.schedule( -classicGroupJoinKey(group.groupId()), -newDelayMs, -TimeUnit.MILLISECONDS, -false, -() -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs) -); +if (containsClassicGroup(groupId)) { Review Comment: ditto. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) { group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, -() -> expirePendingSync(group, group.generationId())); +() -> expirePendingSync(group.groupId(), group.generationId())); } /** * Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and * try complete the join phase. * - * @param group The group. + * @param groupId The group id. * @param memberId The member id. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult expireClassicGroupMemberHeartbeat( -ClassicGroup group, +String groupId, String memberId ) { -if (group.isInState(DEAD)) { -log.info("Received notification of heartbeat expiration for member {} after group {} " + -"had already been unloaded or deleted.", -memberId, group.groupId()); -} else if (group.isPendingMember(memberId)) { -log.info("Pending member {} in group {} has been removed after session timeout expiration.", -memberId, group.groupId()); - -return removePendingMemberAndUpdateClassicGroup(group, memberId
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553481594 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BIDIRECTIONAL("bidirectional"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +DISABLED("disabled"); + +private final String policy; + +ConsumerGroupMigrationPolicy(String config) { +this.policy = config; +} Review Comment: nit: We use different names `config` and `policy`. This is confusing. How about using `name`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BIDIRECTIONAL("bidirectional"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +DISABLED("disabled"); + +private final String policy; + +ConsumerGroupMigrationPolicy(String config) { +this.policy = config; +} + +@Override +public String toString() { +return policy; +} + +public static String validValuesDescription = +BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled" + ", " + +UPGRADE + ": only upgrade is enabled" + ", " + +DOWNGRADE + ": only downgrade is enabled" + ", " + Review Comment: nit: Should we complement with the from...to... like you side for BIDIRECTIONAL? ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1831,6 +1832,22 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testGroupProtocolMigrationPolicy(): Unit = { +val props = new Properties() +props.putAll(kraftProps()) + +// Invalid GroupProtocolMigrationPolicy value. +props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo") +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + +ConsumerGroupMigrationPolicy.values().foreach { policy => + props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString) Review Comment: Is it case sensitive? ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1831,6 +1832,22 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testGroupProtocolMigrationPolicy(): Unit = { +val props = new Proper
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553480909 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -677,6 +679,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval for registered consumers." val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single consumer group can accommodate." val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor." + val ConsumerGroupMigrationPolicyDoc = "The config that enables converting the classic group using the consumer embedded protocol to the consumer group using the consumer group protocol and vice versa. " + ConsumerGroupMigrationPolicy.validValuesDescription Review Comment: Thanks. I would rather prefer to keep all the documentation defined here. This is what we usually do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553477271 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -248,9 +249,10 @@ object KafkaConfig { val ConsumerGroupMaxSessionTimeoutMsProp = "group.consumer.max.session.timeout.ms" val ConsumerGroupHeartbeatIntervalMsProp = "group.consumer.heartbeat.interval.ms" val ConsumerGroupMinHeartbeatIntervalMsProp = "group.consumer.min.heartbeat.interval.ms" - val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" + val ConsumerGroupMaxHeartbeatIntervalMsProp = "group.consumer.max.heartbeat.interval.ms" val ConsumerGroupMaxSizeProp = "group.consumer.max.size" val ConsumerGroupAssignorsProp = "group.consumer.assignors" + val ConsumerGroupMigrationPolicyProp = "consumer.group.migration.policy" Review Comment: Sorry, I missed this one: `consumer.group` -> `group.consumer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-8735: Check properties file existence first [kafka]
qinghui-xu commented on PR #7139: URL: https://github.com/apache/kafka/pull/7139#issuecomment-2039564348 For me this should be merged even just for the sake of the codebase sanity. I can try to rebase it and resolve the conflict to update the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove redundant check in appendLegacyRecord [kafka]
chia7712 merged PR #15638: URL: https://github.com/apache/kafka/pull/15638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16475) Create unit test for TopicImageNode
[ https://issues.apache.org/jira/browse/KAFKA-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834261#comment-17834261 ] Johnny Hsu commented on KAFKA-16475: hi [~cmccabe] I am willing to work on this ticket, thanks! > Create unit test for TopicImageNode > --- > > Key: KAFKA-16475 > URL: https://issues.apache.org/jira/browse/KAFKA-16475 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16475) Create unit test for TopicImageNode
[ https://issues.apache.org/jira/browse/KAFKA-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Hsu reassigned KAFKA-16475: -- Assignee: Johnny Hsu > Create unit test for TopicImageNode > --- > > Key: KAFKA-16475 > URL: https://issues.apache.org/jira/browse/KAFKA-16475 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Johnny Hsu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on PR #15668: URL: https://github.com/apache/kafka/pull/15668#issuecomment-2039540752 @brandboat nice finding! Should we add the thread prefix to https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2487 to avoid similar issue in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1553431953 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, -alterIsrManager: AlterPartitionManager) extends Logging { +alterIsrManager: AlterPartitionManager, +@volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code Review Comment: there is a jira that will address this already which is [KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) I'll update the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1553431953 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, -alterIsrManager: AlterPartitionManager) extends Logging { +alterIsrManager: AlterPartitionManager, +@volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code Review Comment: there is a jira that will be address this already which is [KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) I'll update the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1553428780 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition, object Partition { private val metricsGroup = new KafkaMetricsGroup(classOf[Partition]) - def apply(topicPartition: TopicPartition, + def apply(topicIdPartition: TopicIdPartition, time: Time, replicaManager: ReplicaManager): Partition = { +Partition( Review Comment: The plan is to use this apply in [KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) which will be raised soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
chia7712 commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1552954260 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition, object Partition { private val metricsGroup = new KafkaMetricsGroup(classOf[Partition]) - def apply(topicPartition: TopicPartition, + def apply(topicIdPartition: TopicIdPartition, time: Time, replicaManager: ReplicaManager): Partition = { +Partition( Review Comment: not sure whether we need this new `apply`. No callers have `TopicIdPartition` and hence they have to create `TopicIdPartition` to use this `apply` ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, -alterIsrManager: AlterPartitionManager) extends Logging { +alterIsrManager: AlterPartitionManager, +@volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code Review Comment: Can we add jira link to the comment? The reader can trace the updates easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat opened a new pull request, #15668: URL: https://github.com/apache/kafka/pull/15668 related to KAFKA-16477, After profiling the kafka tests, tons of `client-metrics-reaper` thread not cleanup after BrokerServer shutdown. The thread `client-metrics-reaper` comes from [ClientMetricsManager#expirationTimer](https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115), and BrokerServer#shudown doesn't close ClientMetricsManager which let the thread still runs in background. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on PR #15335: URL: https://github.com/apache/kafka/pull/15335#issuecomment-2039474182 > @OmniaGM , there is compilation error in jdk8_scala2.12 job. Could you have a look? > > ``` > [2024-04-04T09:19:51.266Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4952:125: recursive value leaderTopicsDelta needs type > [2024-04-04T09:19:51.266Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4953:17: value makeLeader is not a member of Any > [2024-04-04T09:19:51.266Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4957:7: overloaded method value assertTrue with alternatives: > [2024-04-04T09:19:51.266Z] (x$1: java.util.function.BooleanSupplier)Unit > [2024-04-04T09:19:51.266Z] (x$1: Boolean)Unit > [2024-04-04T09:19:51.266Z] cannot be applied to (Any) > ``` > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15335/13/pipeline Fixed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16477) Detect thread leaked client-metrics-reaper in tests
Kuan Po Tseng created KAFKA-16477: - Summary: Detect thread leaked client-metrics-reaper in tests Key: KAFKA-16477 URL: https://issues.apache.org/jira/browse/KAFKA-16477 Project: Kafka Issue Type: Improvement Reporter: Kuan Po Tseng Assignee: Kuan Po Tseng After profiling the kafka tests, tons of `client-metrics-reaper` thread not cleanup after BrokerServer shutdown. The thread {{client-metrics-reaper}} comes from [ClientMetricsManager#expirationTimer|https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115], and BrokerServer#shudown doesn't close ClientMetricsManager which let the timer thread still runs in background. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1553357069 ## clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java: ## @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest { */ public static final long EARLIEST_LOCAL_TIMESTAMP = -4L; +public static final long LATEST_TIERED_TIMESTAMP = -5L; Review Comment: Yup, thanks a lot for bringing this up in the mailing list and here, I will open a pull request to amend this miss! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on PR #14716: URL: https://github.com/apache/kafka/pull/14716#issuecomment-2039443091 Heya @cadonna! I have rebased and hopefully addressed all of the first batch of comments. The verifications which are missing are reported as unnecessary/uncalled by Mockito, but if you think they are necessary I will circle back to check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553349919 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1652,81 +1598,64 @@ public void shouldReInitializeTopologyWhenResuming() throws IOException { assertTrue(source1.initialized); assertTrue(source2.initialized); -EasyMock.verify(stateManager, recordCollector); - -EasyMock.reset(recordCollector); -EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()); -EasyMock.replay(recordCollector); assertThat("Map did not contain the partition", task.highWaterMark().containsKey(partition1)); + +verify(recordCollector).offsets(); } @Test public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { final Long offset = 543L; - EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes(); -stateManager.checkpoint(); -EasyMock.expectLastCall().once(); -EasyMock.expect(stateManager.changelogOffsets()) -.andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint -.andReturn(singletonMap(changelogPartition, 10L)) -.andReturn(singletonMap(changelogPartition, 20L)); -EasyMock.expectLastCall(); -EasyMock.replay(stateManager, recordCollector); + when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset)); +when(stateManager.changelogOffsets()) +.thenReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint +.thenReturn(singletonMap(changelogPartition, 10L)) +.thenReturn(singletonMap(changelogPartition, 20L)); task = createStatefulTask(createConfig("100"), true); task.initializeIfNeeded(); -task.completeRestoration(noOpResetter -> { }); +task.completeRestoration(noOpResetter -> { }); // should checkpoint task.prepareCommit(); -task.postCommit(true); // should checkpoint +task.postCommit(true); // should checkpoint task.prepareCommit(); -task.postCommit(false); // should not checkpoint +task.postCommit(false); // should not checkpoint -EasyMock.verify(stateManager, recordCollector); assertThat("Map was empty", task.highWaterMark().size() == 2); + +verify(stateManager, times(2)).checkpoint(); } @Test public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() { final Long offset = 543L; - EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes(); -stateManager.checkpoint(); -EasyMock.expectLastCall().times(2); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition)); -EasyMock.expect(stateManager.changelogOffsets()) -.andReturn(singletonMap(changelogPartition, 0L)) -.andReturn(singletonMap(changelogPartition, 10L)) -.andReturn(singletonMap(changelogPartition, 12000L)); -stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); -EasyMock.expectLastCall(); -EasyMock.replay(stateManager, recordCollector); + when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset)); +when(stateManager.changelogOffsets()) +.thenReturn(singletonMap(changelogPartition, 0L)) +.thenReturn(singletonMap(changelogPartition, 10L)) +.thenReturn(singletonMap(changelogPartition, 12000L)); task = createStatefulTask(createConfig("100"), true); task.initializeIfNeeded(); -task.completeRestoration(noOpResetter -> { }); +task.completeRestoration(noOpResetter -> { }); // should checkpoint task.prepareCommit(); -task.postCommit(true); +task.postCommit(true); // should checkpoint task.prepareCommit(); -task.postCommit(false); +task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold -EasyMock.verify(recordCollector); assertThat("Map was empty", task.highWaterMark().size() == 2); + +verify(stateManager, times(3)).checkpoint(); } @Test public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() { - EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition)); -stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); Review Comment: According to Mockito neither of these were called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To uns
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553347186 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -312,55 +302,40 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test -public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createNiceControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() { +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +stateDirectory = mock(StateDirectory.class); -stateManager.registerGlobalStateStores(emptyList()); -EasyMock.expectLastCall(); - -EasyMock.expect(stateManager.taskId()).andReturn(taskId); - -EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -EasyMock.expectLastCall(); - -stateManager.transitionTaskState(SUSPENDED); Review Comment: When I moved these to verifications Mockito claimed they were never called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org