[PR] MINOR: Fix `KafkaStreams#streamThreadLeaveConsumerGroup` logging [kafka]
lkokhreidze opened a new pull request, #14526: URL: https://github.com/apache/kafka/pull/14526 Fixes logging for `KafkaStreams#streamThreadLeaveConsumerGroup`. In order not to loose the trace of the whole exception, passing `Exception e` as a second argument, while message is pre-formatted as passed as string as a first argument. With this, we won't loose the stack trace of the exception. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated KAFKA-15569: Description: * Update test and add test cases in IQv2StoreIntegrationTest. * Originally, all key-value pairs were confined to a single window, with all data added at WINDOW_START. To improve our testing, we've expanded to multiple windows. * "We've added four key-value pairs at intervals starting from WINDOW_START: at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START + 18min. We're doing this to test the query methods of RangeQuery and WindowRangeQuery. was: * Update test and add test cases in IQv2StoreIntegrationTest * Originally, all key-value pairs were confined to a single window, with all data added at WINDOW_START. To improve our testing, we've expanded to multiple windows * "We've added four key-value pairs at intervals starting from WINDOW_START: at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START + 18min. We're doing this to test the query methods of RangeQuery and WindowRangeQuery. > Update test and add test cases in IQv2StoreIntegrationTest > -- > > Key: KAFKA-15569 > URL: https://issues.apache.org/jira/browse/KAFKA-15569 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > * Update test and add test cases in IQv2StoreIntegrationTest. > * Originally, all key-value pairs were confined to a single window, with all > data added at WINDOW_START. To improve our testing, we've expanded to > multiple windows. > * "We've added four key-value pairs at intervals starting from WINDOW_START: > at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START > + 18min. We're doing this to test the query methods of RangeQuery and > WindowRangeQuery. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended
[ https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-15571: -- Affects Version/s: 3.6.0 > StateRestoreListener#onRestoreSuspended is never called because wrapper > DelegatingStateRestoreListener doesn't implement onRestoreSuspended > --- > > Key: KAFKA-15571 > URL: https://issues.apache.org/jira/browse/KAFKA-15571 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > > With https://issues.apache.org/jira/browse/KAFKA-10575 > `StateRestoreListener#onRestoreSuspended` was added. But local tests show > that it is never called because `DelegatingStateRestoreListener` was not > updated to call a new method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng updated KAFKA-15569: Description: * Update test and add test cases in IQv2StoreIntegrationTest * Originally, all key-value pairs were confined to a single window, with all data added at WINDOW_START. To improve our testing, we've expanded to multiple windows * "We've added four key-value pairs at intervals starting from WINDOW_START: at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START + 18min. We're doing this to test the query methods of RangeQuery and WindowRangeQuery. was:Update test and add test cases in IQv2StoreIntegrationTest > Update test and add test cases in IQv2StoreIntegrationTest > -- > > Key: KAFKA-15569 > URL: https://issues.apache.org/jira/browse/KAFKA-15569 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > * Update test and add test cases in IQv2StoreIntegrationTest > * Originally, all key-value pairs were confined to a single window, with all > data added at WINDOW_START. To improve our testing, we've expanded to > multiple windows > * "We've added four key-value pairs at intervals starting from WINDOW_START: > at WINDOW_START, WINDOW_START + 6min, WINDOW_START + 12min, and WINDOW_START > + 18min. We're doing this to test the query methods of RangeQuery and > WindowRangeQuery. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15577) Reload4j | CVE-2022-45868
masood created KAFKA-15577: -- Summary: Reload4j | CVE-2022-45868 Key: KAFKA-15577 URL: https://issues.apache.org/jira/browse/KAFKA-15577 Project: Kafka Issue Type: Bug Reporter: masood Maven indicates [CVE-2022-45868|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-45868] in Reload4j.jar. [https://mvnrepository.com/artifact/ch.qos.reload4j/reload4j/1.2.19] Could you please verify if this vulnerability affects Kafka? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773911#comment-17773911 ] Benoit Delbosc commented on KAFKA-15402: The problem is still present on the latest version 3.6.0. > Performance regression on close consumer after upgrading to 3.5.0 > - > > Key: KAFKA-15402 > URL: https://issues.apache.org/jira/browse/KAFKA-15402 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Benoit Delbosc >Priority: Major > Attachments: image-2023-08-24-18-51-21-720.png, > image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png > > > Hi, > After upgrading to Kafka client version 3.5.0, we have observed a significant > increase in the duration of our Java unit tests. These unit tests heavily > rely on the Kafka Admin, Producer, and Consumer API. > When using Kafka server version 3.4.1, the duration of the unit tests > increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka > client 3.5.0). > Upgrading the Kafka server to 3.5.1 show similar results. > I have come across the issue KAFKA-15178, which could be the culprit. I will > attempt to test the proposed patch. > In the meantime, if you have any ideas that could help identify and address > the regression, please let me know. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benoit Delbosc updated KAFKA-15402: --- Affects Version/s: 3.6.0 > Performance regression on close consumer after upgrading to 3.5.0 > - > > Key: KAFKA-15402 > URL: https://issues.apache.org/jira/browse/KAFKA-15402 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Benoit Delbosc >Priority: Major > Attachments: image-2023-08-24-18-51-21-720.png, > image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png > > > Hi, > After upgrading to Kafka client version 3.5.0, we have observed a significant > increase in the duration of our Java unit tests. These unit tests heavily > rely on the Kafka Admin, Producer, and Consumer API. > When using Kafka server version 3.4.1, the duration of the unit tests > increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka > client 3.5.0). > Upgrading the Kafka server to 3.5.1 show similar results. > I have come across the issue KAFKA-15178, which could be the culprit. I will > attempt to test the proposed patch. > In the meantime, if you have any ideas that could help identify and address > the regression, please let me know. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15576) Add 3.6.0 to broker/client and streams upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-15576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15576: --- Summary: Add 3.6.0 to broker/client and streams upgrade/compatibility tests (was: Add 3.2.0 to broker/client and streams upgrade/compatibility tests) > Add 3.6.0 to broker/client and streams upgrade/compatibility tests > -- > > Key: KAFKA-15576 > URL: https://issues.apache.org/jira/browse/KAFKA-15576 > Project: Kafka > Issue Type: Task >Reporter: Satish Duggana >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15576) Add 3.2.0 to broker/client and streams upgrade/compatibility tests
Satish Duggana created KAFKA-15576: -- Summary: Add 3.2.0 to broker/client and streams upgrade/compatibility tests Key: KAFKA-15576 URL: https://issues.apache.org/jira/browse/KAFKA-15576 Project: Kafka Issue Type: Task Reporter: Satish Duggana Fix For: 3.7.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15535) Add documentation of "remote.log.index.file.cache.total.size.bytes" configuration property.
[ https://issues.apache.org/jira/browse/KAFKA-15535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773898#comment-17773898 ] Satish Duggana commented on KAFKA-15535: Thanks [~hudeqi] for checking that out. > Add documentation of "remote.log.index.file.cache.total.size.bytes" > configuration property. > > > Key: KAFKA-15535 > URL: https://issues.apache.org/jira/browse/KAFKA-15535 > Project: Kafka > Issue Type: Task > Components: documentation >Reporter: Satish Duggana >Assignee: hudeqi >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Add documentation of "remote.log.index.file.cache.total.size.bytes" > configuration property. > Please double check all the existing public tiered storage configurations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15535) Add documentation of "remote.log.index.file.cache.total.size.bytes" configuration property.
[ https://issues.apache.org/jira/browse/KAFKA-15535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved KAFKA-15535. Resolution: Fixed > Add documentation of "remote.log.index.file.cache.total.size.bytes" > configuration property. > > > Key: KAFKA-15535 > URL: https://issues.apache.org/jira/browse/KAFKA-15535 > Project: Kafka > Issue Type: Task > Components: documentation >Reporter: Satish Duggana >Assignee: hudeqi >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Add documentation of "remote.log.index.file.cache.total.size.bytes" > configuration property. > Please double check all the existing public tiered storage configurations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15570: Add unit tests for MemoryConfigBackingStore [kafka]
yashmayya commented on code in PR #14518: URL: https://github.com/apache/kafka/pull/14518#discussion_r1354103192 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.runtime.TargetState; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class MemoryConfigBackingStoreTest { + +private static final List CONNECTOR_IDS = Arrays.asList("connector1", "connector2"); + +// Actual values are irrelevant here and can be used as either connector or task configurations +private static final List> SAMPLE_CONFIGS = Arrays.asList( +Collections.singletonMap("config-key-one", "config-value-one"), +Collections.singletonMap("config-key-two", "config-value-two"), +Collections.singletonMap("config-key-three", "config-value-three") +); + +@Mock +private ConfigBackingStore.UpdateListener configUpdateListener; +private final MemoryConfigBackingStore configStore = new MemoryConfigBackingStore(); Review Comment: JUnit creates a new instance of the test class for each test method run (exactly so that state isn't shared between tests), so this shouldn't really make a difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Disable flaky kraft-test in FetchRequestTest [kafka]
dengziming opened a new pull request, #14525: URL: https://github.com/apache/kafka/pull/14525 *More detailed description of your change* We introduced a bunch of flaky tests in #14295 , which are normal when running locally but will always fail in CI, lets rollback them before we find the cause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15507) adminClient should not throw retriable exception when closing instance
[ https://issues.apache.org/jira/browse/KAFKA-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15507. --- Fix Version/s: 3.7.0 Resolution: Fixed > adminClient should not throw retriable exception when closing instance > -- > > Key: KAFKA-15507 > URL: https://issues.apache.org/jira/browse/KAFKA-15507 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 3.5.1 >Reporter: Luke Chen >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.7.0 > > > When adminClient is closing the instance, it'll first set > `hardShutdownTimeMs` to a positive timeout value, and then wait until > existing threads to complete within the timeout. However, within this > waiting, when new caller tries to invoke new commend in adminClient, it'll > immediately get an > {code:java} > TimeoutException("The AdminClient thread is not accepting new calls.") > {code} > There are some issues with the design: > 1. Since the `TimeoutException` is a retriable exception, the caller will > enter a tight loop and keep trying it > 2. The error message is confusing. What does "the adminClient is not > accepting new calls" mean? > We should improve it by throwing a non-retriable error (ex: > IllegalStateException), then, the error message should clearly describe the > adminClient is closing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing [kafka]
showuon merged PR #14455: URL: https://github.com/apache/kafka/pull/14455 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing [kafka]
showuon commented on PR #14455: URL: https://github.com/apache/kafka/pull/14455#issuecomment-1756715114 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove StreamsProducer flush under EOS [kafka]
github-actions[bot] commented on PR #13994: URL: https://github.com/apache/kafka/pull/13994#issuecomment-1756711475 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13756: Connect validate endpoint should return proper validatio… [kafka]
github-actions[bot] commented on PR #13813: URL: https://github.com/apache/kafka/pull/13813#issuecomment-1756711573 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
showuon merged PR #14482: URL: https://github.com/apache/kafka/pull/14482 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
showuon commented on PR #14482: URL: https://github.com/apache/kafka/pull/14482#issuecomment-1756672787 Failed tests are unrelated and also failed in trunk build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15570: Add unit tests for MemoryConfigBackingStore [kafka]
kpatelatwork commented on code in PR #14518: URL: https://github.com/apache/kafka/pull/14518#discussion_r1353833312 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.runtime.TargetState; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class MemoryConfigBackingStoreTest { + +private static final List CONNECTOR_IDS = Arrays.asList("connector1", "connector2"); + +// Actual values are irrelevant here and can be used as either connector or task configurations +private static final List> SAMPLE_CONFIGS = Arrays.asList( +Collections.singletonMap("config-key-one", "config-value-one"), +Collections.singletonMap("config-key-two", "config-value-two"), +Collections.singletonMap("config-key-three", "config-value-three") +); + +@Mock +private ConfigBackingStore.UpdateListener configUpdateListener; +private final MemoryConfigBackingStore configStore = new MemoryConfigBackingStore(); Review Comment: should this be declared non-final and instantiated in the `setup()` instead so we don't share state between tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353833261 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } +@Override +public boolean isEmpty() { +return isInState(EMPTY); +} + +/** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. + */ +@Override +public Optional offsetExpirationCondition() { +if (protocolType.isPresent()) { +if (isInState(EMPTY)) { +// No consumer exists in the group => +// - If current state timestamp exists and retention period has passed since group became Empty, +// expire all offsets with no pending offset commit; +// - If there is no current state timestamp (old group metadata schema) and retention period has passed +// since the last commit timestamp, expire the offset +return Optional.of(new OffsetExpirationConditionImpl( +offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)) +); +} else if (usesConsumerGroupProtocol() && subscribedTopics.isPresent() && isInState(STABLE)) { +// Consumers exist in the group and group is Stable => +// - If the group is aware of the subscribed topics and retention period had passed since the +// last commit timestamp, expire the offset. offset with pending offset commit are not +// expired +return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)); +} +} else { +// protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only +// expire offsets where retention period has passed since their last commit Review Comment: nit: periods and capitalization -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832984 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } +@Override +public boolean isEmpty() { +return isInState(EMPTY); +} + +/** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. + */ +@Override +public Optional offsetExpirationCondition() { +if (protocolType.isPresent()) { +if (isInState(EMPTY)) { +// No consumer exists in the group => +// - If current state timestamp exists and retention period has passed since group became Empty, +// expire all offsets with no pending offset commit; +// - If there is no current state timestamp (old group metadata schema) and retention period has passed +// since the last commit timestamp, expire the offset +return Optional.of(new OffsetExpirationConditionImpl( +offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)) +); +} else if (usesConsumerGroupProtocol() && subscribedTopics.isPresent() && isInState(STABLE)) { +// Consumers exist in the group and group is Stable => +// - If the group is aware of the subscribed topics and retention period had passed since the +// last commit timestamp, expire the offset. offset with pending offset commit are not Review Comment: nit: Offsets* with pending o...are not expired.* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832660 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } +@Override +public boolean isEmpty() { +return isInState(EMPTY); +} + +/** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. + */ +@Override +public Optional offsetExpirationCondition() { +if (protocolType.isPresent()) { +if (isInState(EMPTY)) { +// No consumer exists in the group => +// - If current state timestamp exists and retention period has passed since group became Empty, +// expire all offsets with no pending offset commit; +// - If there is no current state timestamp (old group metadata schema) and retention period has passed +// since the last commit timestamp, expire the offset +return Optional.of(new OffsetExpirationConditionImpl( +offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)) +); +} else if (usesConsumerGroupProtocol() && subscribedTopics.isPresent() && isInState(STABLE)) { +// Consumers exist in the group and group is Stable => +// - If the group is aware of the subscribed topics and retention period had passed since the Review Comment: nit: has* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832416 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } +@Override +public boolean isEmpty() { +return isInState(EMPTY); +} + +/** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. + */ +@Override +public Optional offsetExpirationCondition() { +if (protocolType.isPresent()) { +if (isInState(EMPTY)) { +// No consumer exists in the group => Review Comment: nit: No consumers* exist* in the group, also do we wanna name it members to be consistent with the rest of the code terminology? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831509 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } +@Override +public boolean isEmpty() { +return isInState(EMPTY); +} + +/** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. Review Comment: nit: if* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831027 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ +public boolean cleanupExpiredOffsets(String groupId, List records) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return true; +} + +// We expect the group to exist. +Group group = groupMetadataManager.group(groupId); +Set expiredPartitions = new HashSet<>(); +long currentTimestampMs = time.milliseconds(); +Optional offsetExpirationCondition = group.offsetExpirationCondition(); + +if (!offsetExpirationCondition.isPresent()) { +return false; +} + +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +OffsetExpirationCondition condition = offsetExpirationCondition.get(); + +offsetsByTopic.forEach((topic, partitions) -> { +if (!group.isSubscribedToTopic(topic)) { +partitions.forEach((partition, offsetAndMetadata) -> { +if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs)) { + expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString()); +} else { +hasAllOffsetsExpired.set(false); +} +}); +} else { +hasAllOffsetsExpired.set(false); +} +}); + +if (!expiredPartitions.isEmpty()) { +log.info("[GroupId {}] Expiring offsets of partitions (hasAllOffsetsExpired={}): {}", Review Comment: The placement of hasAllOffsetsExpired seems a bit off for the logging message `[GroupId 12345] Expiring offsets of partitions (hasAllOffsetsExpired=false): partition1, partition2, partition3` This is how it would look right? Can we move it to the end of the list or the beginning of this line if you think it makes sense as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353831281 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -635,6 +639,21 @@ public void createGroupTombstoneRecords(List records) { records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId())); } +@Override +public boolean isEmpty() { +return state() == ConsumerGroupState.EMPTY; +} + +/** + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. Review Comment: nit: if* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353830271 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ +public boolean cleanupExpiredOffsets(String groupId, List records) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return true; +} + +// We expect the group to exist. +Group group = groupMetadataManager.group(groupId); +Set expiredPartitions = new HashSet<>(); +long currentTimestampMs = time.milliseconds(); +Optional offsetExpirationCondition = group.offsetExpirationCondition(); + +if (!offsetExpirationCondition.isPresent()) { +return false; +} + +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +OffsetExpirationCondition condition = offsetExpirationCondition.get(); + +offsetsByTopic.forEach((topic, partitions) -> { +if (!group.isSubscribedToTopic(topic)) { +partitions.forEach((partition, offsetAndMetadata) -> { +if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs)) { + expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString()); +} else { +hasAllOffsetsExpired.set(false); +} +}); +} else { +hasAllOffsetsExpired.set(false); +} +}); + +if (!expiredPartitions.isEmpty()) { +log.info("[GroupId {}] Expiring offsets of partitions (hasAllOffsetsExpired={}): {}", Review Comment: Correct me if I'm wrong this is just for my understanding, so unless all the offsets are expired we don't delete the group right? In cases where hasAllOffsetsExpired is false and expiredPartitions is non-empty, the group won't be deleted but the tombstone records will be appended? And the next time we iterate through the partitions the ones with the tombstone record aren't included right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353823509 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ +public boolean cleanupExpiredOffsets(String groupId, List records) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return true; +} + +// We expect the group to exist. +Group group = groupMetadataManager.group(groupId); +Set expiredPartitions = new HashSet<>(); +long currentTimestampMs = time.milliseconds(); +Optional offsetExpirationCondition = group.offsetExpirationCondition(); + +if (!offsetExpirationCondition.isPresent()) { +return false; +} + +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); Review Comment: nit: haveAllOffsetsExpired -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] System Tests for KIP-848 [kafka]
rreddy-22 opened a new pull request, #14524: URL: https://github.com/apache/kafka/pull/14524 Added configs and custom decorators to facilitate the testing of the old protocol with the new group coordinator (KIP-848) in kraft mode. The new coordinator doesn't support zookeeper mode and hence, this combination will be skipped. **Files Changed:** 1) kafka.py : Added an argument called use_new_coordinator to the kafka service which can be set in the matrix of every test. The default value for this config will be set as false. 2) util.py : Added a custom decorator that will skip the test for zk=true and use_new_coordinator=true since this isn't supported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
jeffkbkim commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353679164 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ +public boolean cleanupExpiredOffsets(String groupId, List records) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return true; +} + +// We expect the group to exist. Review Comment: yes ``` groupMetadataManager.groupIds().forEach(groupId -> { if (offsetMetadataManager.cleanupExpiredOffsets(groupId, records)) { groupMetadataManager.maybeDeleteGroup(groupId, records); } }); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jeffkbkim commented on PR #14417: URL: https://github.com/apache/kafka/pull/14417#issuecomment-1756480353 @jolshan this is flaky in trunk as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jolshan commented on PR #14417: URL: https://github.com/apache/kafka/pull/14417#issuecomment-1756470166 I restarted the build. But can we take a look at `[kafka.server.FetchRequestTest.testLastFetchedEpochValidationV12(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14417/14/testReport/junit/kafka.server/FetchRequestTest/Build___JDK_11_and_Scala_2_13___testLastFetchedEpochValidationV12_String__quorum_kraft/)` It seems to be failing on all the versions that ran for build 14. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended
[ https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773849#comment-17773849 ] Matthias J. Sax commented on KAFKA-15571: - Ups... Thanks to reporting and the PR! > StateRestoreListener#onRestoreSuspended is never called because wrapper > DelegatingStateRestoreListener doesn't implement onRestoreSuspended > --- > > Key: KAFKA-15571 > URL: https://issues.apache.org/jira/browse/KAFKA-15571 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0, 3.5.1 >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > > With https://issues.apache.org/jira/browse/KAFKA-10575 > `StateRestoreListener#onRestoreSuspended` was added. But local tests show > that it is never called because `DelegatingStateRestoreListener` was not > updated to call a new method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]
CalvinConfluent commented on PR #14053: URL: https://github.com/apache/kafka/pull/14053#issuecomment-1756411287 Thanks @hachikuji , verified the failed UT can pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]
kirktrue commented on PR #14357: URL: https://github.com/apache/kafka/pull/14357#issuecomment-1756404090 @lianetm @philipnee @dajac we can start reviewing this PR now. I'll keep working on the unit tests in the background. I resolved a bunch of comments that pertained to a former implementation approach that we abandoned. If any are still relevant, feel free to un-resolve them. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]
kirktrue commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1353544098 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java: ## @@ -85,122 +111,162 @@ enum ReconciliationResult { private final BlockingQueue backgroundEventQueue; private Optional inflightCallback; -public MemberAssignmentReconciler(LogContext logContext, - SubscriptionState subscriptions, - ConsumerMetadata metadata, - BlockingQueue backgroundEventQueue) { +AssignmentReconciler(LogContext logContext, + SubscriptionState subscriptions, + ConsumerMetadata metadata, + BlockingQueue backgroundEventQueue) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.metadata = metadata; this.backgroundEventQueue = backgroundEventQueue; } /** - * Perform the revocation process, if necessary, depending on the given {@link Assignment target assignment}. If the - * {@link SubscriptionState#assignedPartitions() current set of assigned partitions} includes entries that are - * not in the target assignment, these will be considered for revocation. If there is already a - * reconciliation in progress (revocation or assignment), this method will return without performing any - * revocation. + * Perform the reconciliation process, as necessary to meet the given {@link Assignment target assignment}. Note + * that the reconciliation is a multi-step process, and this method should be invoked on each heartbeat if + * the coordinator provides a {@link Assignment target assignment}. * * @param assignment Target {@link Assignment} * @return {@link ReconciliationResult} */ -ReconciliationResult revoke(Optional assignment) { +ReconciliationResult maybeReconcile(Optional assignment) { // Check for any outstanding operations first. If a conclusive result has already been reached, return that // before processing any further. -Optional inflightStatus = checkInflightStatus(); +if (inflightCallback.isPresent()) { +// We don't actually need the _result_ of the event, just to know that it's complete. +if (inflightCallback.get().future().isDone()) { +// This is the happy path--we completed the callback. Clear out our inflight callback first, though. +inflightCallback = Optional.empty(); Review Comment: Is the `HeartbeatRequestManager` going to call `AssignmentReconciler.lose()` to drop the partitions at that point? If so, this is the code at the top of that `lose()` method: ```java ReconciliationResult lose() { // Clear the inflight callback reference. This is done regardless of if one existed; if there was one it is // now abandoned because we're going to "lose" our partitions. This will also allow us to skip the inflight // check the other steps take. inflightCallback = Optional.empty(); . . . ``` Does that seem sufficient, or is more needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]
kirktrue commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1353534627 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java: ## @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.LosePartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * {@code AssignmentReconciler} performs the work of reconciling this consumer's partition assignment as directed + * by the consumer group coordinator. When the coordinator determines that a change to the partition ownership of + * the group is required, it will communicate with each consumer to relay its respective target + * assignment, that is, the set of partitions for which that consumer should now assume ownership. It is the then the + * responsibility of the consumer to work toward that target by performing the necessary internal modifications to + * satisfy the assignment from the coordinator. In practical terms, this means that it must first determine the set + * difference between the {@link SubscriptionState#assignedPartitions() current assignment} and the + * {@link Assignment#assignedTopicPartitions() target assignment}. + * + * + * + * Internally, reconciliation is a multi-step process: + * + * + * Calculating partitions to revoke + * Invoking {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} + * Removing those partitions from its {@link SubscriptionState#assignFromSubscribed(Collection) assignment} + * Perform a heartbeat acknowledgement with the group coordinator + * Calculating partitions to assign + * Invoking {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)} + * Adding those partitions to its {@link SubscriptionState#assignFromSubscribed(Collection) assignment} + * Perform a heartbeat acknowledgement with the group coordinator + * + * + * + * + * Because the target assignment from the group coordinator is declarative, the implementation of the + * reconciliation process is idempotent. The caller of this class is free to invoke {@link #maybeReconcile(Optional)} + * repeatedly for as long as the group coordinator provides an {@link Assignment}. + * + * + * + * {@link ReconciliationResult#UNCHANGED}: no changes were made to the set of partitions. + * + * + * {@link ReconciliationResult#RECONCILING}: changes to the assignment have started. In practice this means + * that the appropriate {@link ConsumerRebalanceListener} callback method is being invoked. + * + * + * {@link ReconciliationResult#APPLIED_LOCALLY}: the {@link ConsumerRebalanceListener} callback method was made and + * the changes were applied locally. + * + * + * + * The comparison against the {@link SubscriptionState#assignedPartitions() current set of assigned partitions} and + * the {@link Assignment#assignedTopicPartitions() t
Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]
kirktrue commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1353533180 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java: ## @@ -44,21 +45,46 @@ import java.util.stream.Collectors; /** - * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to first determine, and then modify the - * current set of assigned {@link TopicPartition partitions} via {@link SubscriptionState}. Reconciliation is a - * two-part process, the first being a revocation of partitions, followed by assignment of partitions. Each of the two - * steps may result in one of the following: + * {@code AssignmentReconciler} performs the work of reconciling this consumer's partition assignment as directed + * by the consumer group coordinator. When the coordinator determines that a change to the partition ownership of + * the group is required, it will communicate with each consumer to relay its respective target + * assignment, that is, the set of partitions for which that consumer should now assume ownership. It is the then the + * responsibility of the consumer to work toward that target by performing the necessary internal modifications to + * satisfy the assignment from the coordinator. In practical terms, this means that it must first determine the set + * difference between the {@link SubscriptionState#assignedPartitions() current assignment} and the + * {@link Assignment#assignedTopicPartitions() target assignment}. + * + * + * + * Internally, reconciliation is a multi-step process: + * + * + * Calculating partitions to revoke + * Invoking {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} Review Comment: Yes, that has been split off into KAFKA-15573. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update test and add test cases in IQv2StoreIntegrationTest [kafka]
hanyuzheng7 opened a new pull request, #14523: URL: https://github.com/apache/kafka/pull/14523 Update test and add test cases in IQv2StoreIntegrationTest Change the input key-value pair timestamp and add more test cases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration
Greg Harris created KAFKA-15575: --- Summary: Prevent Connectors from exceeding tasks.max configuration Key: KAFKA-15575 URL: https://issues.apache.org/jira/browse/KAFKA-15575 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Greg Harris The Connector::taskConfigs(int maxTasks) function is used by Connectors to enumerate tasks configurations. This takes an argument which comes from the tasks.max connector config. This is the Javadoc for that method: {noformat} /** * Returns a set of configurations for Tasks based on the current configuration, * producing at most {@code maxTasks} configurations. * * @param maxTasks maximum number of configurations to generate * @return configurations for Tasks */ public abstract List> taskConfigs(int maxTasks); {noformat} This includes the constraint that the number of tasks is at most maxTasks, but this constraint is not enforced by the framework. We should begin enforcing this constraint by dropping configs that exceed the limit, and logging a warning. For sink connectors this should harmlessly rebalance the consumer subscriptions onto the remaining tasks. For source connectors that distribute their work via task configs, this may result in an interruption in data transfer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
plazma-prizma commented on code in PR #14491: URL: https://github.com/apache/kafka/pull/14491#discussion_r1353388791 ## clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java: ## @@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder createPrincipalBuilder(Map config KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class principalBuilderClass = (Class) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); -final KafkaPrincipalBuilder builder; +KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { -builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); +try { +Constructor constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class); +builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper); Review Comment: Thanks for the changes. LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] More image replay test cases [kafka]
rondagostino commented on PR #14206: URL: https://github.com/apache/kafka/pull/14206#issuecomment-1756151303 @ahuang98 I still saw lots of test failures, and when I checked them locally I actually was able to reproduce the below failures. Can you take a look to see if you also agree these fail locally and then, assuming you agree, investigate as to why? ``` testTopicDualWriteDelta() testTopicDualWriteSnapshot() testControllerFailover() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15571 / `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` [kafka]
lkokhreidze commented on PR #14519: URL: https://github.com/apache/kafka/pull/14519#issuecomment-1756120126 Hi @cadonna, I've added an integration test. It seemed the safest way to test out the new functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Replace GroupState with MembershipManager [kafka]
philipnee commented on code in PR #14390: URL: https://github.com/apache/kafka/pull/14390#discussion_r1353232858 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -180,19 +180,19 @@ CompletableFuture sendAutoCommit(final Map offsets; private final String groupId; -private final GroupState.Generation generation; +private final int memberEpoch; Review Comment: We could do it either way - recreate this RequestState object or resend the same object with the updated epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15548) Handling close() properly
[ https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15548: --- Labels: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e kip-848-preview (was: consumer-threading-refactor kip-848 kip-848-client-support kip-848-preview) > Handling close() properly > - > > Key: KAFKA-15548 > URL: https://issues.apache.org/jira/browse/KAFKA-15548 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > Upon closing of the {{Consumer}} we need to: > # Complete pending commits > # Auto-commit if needed > # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the > group > # Close any fetch sessions on the brokers > # Poll the NetworkClient to complete pending I/O > There is a mechanism introduced in PR > [14406|https://github.com/apache/kafka/pull/14406] that allows for performing > network I/O on shutdown. The new method > {{DefaultBackgroundThread.runAtClose()}} will be executed when > {{Consumer.close()}} is invoked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15534) Propagate client response time when timeout to the request handler
[ https://issues.apache.org/jira/browse/KAFKA-15534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yi Ding updated KAFKA-15534: Labels: consumer-threading-refactor kip-848 kip-848-client-support kip-848-e2e kip-848-preview (was: consumer-threading-refactor) > Propagate client response time when timeout to the request handler > -- > > Key: KAFKA-15534 > URL: https://issues.apache.org/jira/browse/KAFKA-15534 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > Currently, we don't have a good way to propagate the response time to the > handler when timeout is thrown. > {code:java} > unsent.handler.onFailure(new TimeoutException( > "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); > {code} > The current request manager invoke a system call to retrieve the response > time, which is not idea because it is already available at network client > This is an example of the coordinator request manager: > {code:java} > unsentRequest.future().whenComplete((clientResponse, throwable) -> { > long responseTimeMs = time.milliseconds(); > if (clientResponse != null) { > FindCoordinatorResponse response = (FindCoordinatorResponse) > clientResponse.responseBody(); > onResponse(responseTimeMs, response); > } else { > onFailedResponse(responseTimeMs, throwable); > } > }); {code} > But in the networkClientDelegate, we should utilize the currentTimeMs in the > trySend to avoid calling time.milliseconds(): > {code:java} > private void trySend(final long currentTimeMs) { > ... > unsent.handler.onFailure(new TimeoutException( > "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); > continue; > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15548) Handling close() properly
[ https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yi Ding updated KAFKA-15548: Labels: consumer-threading-refactor kip-848 kip-848-client-support kip-848-preview (was: consumer-threading-refactor kip-) > Handling close() properly > - > > Key: KAFKA-15548 > URL: https://issues.apache.org/jira/browse/KAFKA-15548 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-preview > > Upon closing of the {{Consumer}} we need to: > # Complete pending commits > # Auto-commit if needed > # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the > group > # Close any fetch sessions on the brokers > # Poll the NetworkClient to complete pending I/O > There is a mechanism introduced in PR > [14406|https://github.com/apache/kafka/pull/14406] that allows for performing > network I/O on shutdown. The new method > {{DefaultBackgroundThread.runAtClose()}} will be executed when > {{Consumer.close()}} is invoked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15548) Handling close() properly
[ https://issues.apache.org/jira/browse/KAFKA-15548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yi Ding updated KAFKA-15548: Labels: consumer-threading-refactor kip- (was: consumer-threading-refactor) > Handling close() properly > - > > Key: KAFKA-15548 > URL: https://issues.apache.org/jira/browse/KAFKA-15548 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip- > > Upon closing of the {{Consumer}} we need to: > # Complete pending commits > # Auto-commit if needed > # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the > group > # Close any fetch sessions on the brokers > # Poll the NetworkClient to complete pending I/O > There is a mechanism introduced in PR > [14406|https://github.com/apache/kafka/pull/14406] that allows for performing > network I/O on shutdown. The new method > {{DefaultBackgroundThread.runAtClose()}} will be executed when > {{Consumer.close()}} is invoked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Replace GroupState with MembershipManager [kafka]
dajac commented on code in PR #14390: URL: https://github.com/apache/kafka/pull/14390#discussion_r1353179173 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -180,19 +180,19 @@ CompletableFuture sendAutoCommit(final Map offsets; private final String groupId; -private final GroupState.Generation generation; +private final int memberEpoch; Review Comment: > Stale member epcoh error isn't retriable A stale member epoch error is retriable... but it should be retried with the new epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1353117588 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } +// w2 = {} +final long time = 1000L; +for (int i = 0; i < expectedKeys.length; i++) { +inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i); +} +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "B0+null", 1000L), +new KeyValueTimestamp<>(1, "B1+null", 1001L), +new KeyValueTimestamp<>(2, "B2+null", 1002L) +); +} Review Comment: Noob question: why do we have output here? The time difference is `100ms`, should we only output these three if we got an event with time `1103`? Maybe I'm missing something ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockAp
Re: [PR] MINOR - KAFKA-15550: Validation for negative target times in offsetsForTimes [kafka]
lianetm commented on PR #14503: URL: https://github.com/apache/kafka/pull/14503#issuecomment-1756011144 hey @cadonna, @lucasbru. This is a very small PR adding a missing API validation to the new consumer `offsetsForTimes` functionality. It also includes some improved comments for the `updateFetchPositions`. It could all be useful to take a look at the new code if you have some time. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]
jolshan commented on PR #13528: URL: https://github.com/apache/kafka/pull/13528#issuecomment-1755954984 Sorry I must have missed this ping. :( My bad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]
kirktrue commented on PR #13528: URL: https://github.com/apache/kafka/pull/13528#issuecomment-1755953000 This appears to be taken up by someone else, but they're doing it in a more automated/dynamic manner, which is better anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Backport kafka 15415 [kafka]
msn-tldr closed pull request #14521: Backport kafka 15415 URL: https://github.com/apache/kafka/pull/14521 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader (#… [kafka]
msn-tldr opened a new pull request, #14522: URL: https://github.com/apache/kafka/pull/14522 …14384) This PR backports https://github.com/apache/kafka/pull/14384 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]
kirktrue closed pull request #13528: KAFKA-14879: Update system tests to use latest versions URL: https://github.com/apache/kafka/pull/13528 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Backport kafka 15415 [kafka]
msn-tldr opened a new pull request, #14521: URL: https://github.com/apache/kafka/pull/14521 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353056783 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ +public boolean cleanupExpiredOffsets(String groupId, List records) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return true; +} + +// We expect the group to exist. Review Comment: Is this method only called after the groupId is verified to exist? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353055183 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. Review Comment: nit: the group -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353053689 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import java.util.function.Function; + +public class OffsetExpirationConditionImpl implements OffsetExpirationCondition { + +/** + * Given an offset and metadata, obtain the base timestamp that should be used + * as the start of the offsets retention period. + */ +private final Function baseTimestamp; + +public OffsetExpirationConditionImpl(Function baseTimestamp) { +this.baseTimestamp = baseTimestamp; +} + +/** + * Determine whether an offset is expired. Older versions have an expire timestamp per partition. If this + * exists, compare against the current timestamp. Otherwise, use the base timestamp (either commit timestamp + * or current state timestamp if group is empty for generic groups) and check whether the offset has + * exceeded the offset retention. + * + * @param offset The offset and metadata. + * @param currentTimestampMs The current timestamp. + * @param offsetsRetentionMs The offsets retention in milliseconds. + * + * @return Whether the given offset is expired or not. + */ +@Override +public boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestampMs, long offsetsRetentionMs) { +if (offset.expireTimestampMs.isPresent()) { +// Older versions with explicit expire_timestamp field => old expiration semantics is used +return currentTimestampMs >= offset.expireTimestampMs.getAsLong(); +} else { +// Current version with no per partition retention Review Comment: nit: missing period -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353048258 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -417,6 +441,39 @@ public CoordinatorResult deleteOffsets( return offsetMetadataManager.deleteOffsets(request); } +/** + * For each group, remove all expired offsets. If all offsets for the group is removed and the group is eligible Review Comment: nit: offsets for the group are* removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045452 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -91,6 +91,27 @@ public class GroupCoordinatorConfig { */ public final int genericGroupMaxSessionTimeoutMs; +/** + * Frequency at which to check for expired offsets. + */ +public final long offsetsRetentionCheckIntervalMs; + +/** + * For subscribed consumers, committed offset of a specific partition will be expired and discarded when + * 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); + * 2) this retention period has elapsed since the last time an offset is committed for the partition AND + *the group is no longer subscribed to the corresponding topic. + * + * For standalone consumers (using manual assignment), offsets will be expired after this retention period has + * elapsed since the time of last commit. + * + * Note that when a group is deleted via the DeleteGroups request, its committed offsets will also be deleted immediately; + * + * Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's + * committed offsets for that topic will also be deleted without extra retention period Review Comment: nit: missing period ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -91,6 +91,27 @@ public class GroupCoordinatorConfig { */ public final int genericGroupMaxSessionTimeoutMs; +/** + * Frequency at which to check for expired offsets. + */ +public final long offsetsRetentionCheckIntervalMs; + +/** + * For subscribed consumers, committed offset of a specific partition will be expired and discarded when + * 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); + * 2) this retention period has elapsed since the last time an offset is committed for the partition AND + *the group is no longer subscribed to the corresponding topic. + * + * For standalone consumers (using manual assignment), offsets will be expired after this retention period has + * elapsed since the time of last commit. + * + * Note that when a group is deleted via the DeleteGroups request, its committed offsets will also be deleted immediately; + * + * Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's Review Comment: nit: missing period -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045293 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -117,4 +119,16 @@ void validateOffsetFetch( * @param records The list of records. */ void createGroupTombstoneRecords(List records); + +/** + * @return Whether the group can be deleted or not. + */ +boolean isEmpty(); Review Comment: We should fix the javadoc. Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353045205 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -91,6 +91,27 @@ public class GroupCoordinatorConfig { */ public final int genericGroupMaxSessionTimeoutMs; +/** + * Frequency at which to check for expired offsets. + */ +public final long offsetsRetentionCheckIntervalMs; + +/** + * For subscribed consumers, committed offset of a specific partition will be expired and discarded when + * 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); + * 2) this retention period has elapsed since the last time an offset is committed for the partition AND + *the group is no longer subscribed to the corresponding topic. + * + * For standalone consumers (using manual assignment), offsets will be expired after this retention period has + * elapsed since the time of last commit. + * + * Note that when a group is deleted via the DeleteGroups request, its committed offsets will also be deleted immediately; + * + * Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's Review Comment: nit: missing period -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353044716 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -91,6 +91,27 @@ public class GroupCoordinatorConfig { */ public final int genericGroupMaxSessionTimeoutMs; +/** + * Frequency at which to check for expired offsets. + */ +public final long offsetsRetentionCheckIntervalMs; + +/** + * For subscribed consumers, committed offset of a specific partition will be expired and discarded when Review Comment: nit: can we put a colon after the when and capitalize the T in this for the bullet points? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353043290 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -117,4 +119,16 @@ void validateOffsetFetch( * @param records The list of records. */ void createGroupTombstoneRecords(List records); + +/** + * @return Whether the group can be deleted or not. + */ +boolean isEmpty(); Review Comment: the return value seems more like a use case right? Should we update the name of the method or the return statement and add the "whether group can be deleted or not" part as a use case in the javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1353033902 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -540,6 +545,8 @@ class RemoteIndexCacheTest { "Failed to mark cache entry for cleanup after resizing cache.") TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, "Failed to cleanup cache entry after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished, + "Failed to finish cleanup cache entry after resizing cache.") // verify no index files on remote cache dir TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, Review Comment: @showuon I have removed the `isCleanFinished` flag changes and catch the `Exception` in the method. cc @hudeqi Please review it . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1353033902 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -540,6 +545,8 @@ class RemoteIndexCacheTest { "Failed to mark cache entry for cleanup after resizing cache.") TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, "Failed to cleanup cache entry after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished, + "Failed to finish cleanup cache entry after resizing cache.") // verify no index files on remote cache dir TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, Review Comment: @showuon I have removed the `isCleanFinished` flag and catch the `Exception` in the method. Please review it . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15574: --- Description: This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occur. All driven by the HB manager. was: This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occurs. All driven by the HB manager. > Integrate partition assignment reconciliation with heartbeat request manager > > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task is to call the partition assignment reconciler from the heartbeat > request manager, making sure to correctly query the state machine for the > right actions. > > The HB-reconciler interaction is 2 folded: > * HB should send HB req when the reconciler completes callbacks > * HB manager needs to trigger the reconciler to release assignments when > errors occur. > All driven by the HB manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15574: --- Description: This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occurs. All driven by the HB manager. was: This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occurs. All driven by the HB manager. > Integrate partition assignment reconciliation with heartbeat request manager > > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task is to call the partition assignment reconciler from the heartbeat > request manager, making sure to correctly query the state machine for the > right actions. > > The HB-reconciler interaction is 2 folded: > * HB should send HB req when the reconciler completes callbacks > * HB manager needs to trigger the reconciler to release assignments when > errors occurs. > All driven by the HB manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15574: --- Description: This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. The HB-reconciler interaction is 2 folded: * HB should send HB req when the reconciler completes callbacks * HB manager needs to trigger the reconciler to release assignments when errors occurs. All driven by the HB manager. was:This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. > Integrate partition assignment reconciliation with heartbeat request manager > > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task is to call the partition assignment reconciler from the heartbeat > request manager, making sure to correctly query the state machine for the > right actions. > > The HB-reconciler interaction is 2 folded: * HB should send HB req when the > reconciler completes callbacks > * HB manager needs to trigger the reconciler to release assignments when > errors occurs. > All driven by the HB manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jeffkbkim commented on PR #14417: URL: https://github.com/apache/kafka/pull/14417#issuecomment-1755898146 not sure what this error is from ``` > Task :streams:compileJava /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14417/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java:246: error: incompatible types: inference variable V#1 has incompatible equality constraints KeyValueStore,VAgg return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null)); ^ where V#1,K#1,S,VAgg,K#2,V#2 are type-variables: V#1 extends Object declared in method with(Serde,Serde) K#1 extends Object declared in method with(Serde,Serde) S extends StateStore declared in method with(Serde,Serde) VAgg extends Object declared in method aggregate(Initializer,Aggregator,Aggregator) K#2 extends Object declared in class KGroupedTableImpl V#2 extends Object declared in class KGroupedTableImpl 1 error ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
philipnee commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1353024844 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } +/** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { +log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { -// Keep the latest next target assignment -nextTargetAssignment = Optional.of(newTargetAssignment); +transitionToFailed(); +throw new IllegalStateException("A target assignment pending to be reconciled already" + +" exists."); } } -private boolean hasPendingTargetAssignment() { -return targetAssignment.isPresent() || nextTargetAssignment.isPresent(); -} - - -/** - * Update state and assignment as the member has successfully processed a new target - * assignment. - * This indicates the end of the reconciliation phase for the member, and makes the target - * assignment the new current assignment. - * - * @param assignment Target assignment the member was able to successfully process - */ -public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) { -updateAssignment(assignment); -transitionTo(MemberState.STABLE); -} - /** - * Update state and member info as the member was not able to process the assignment, due to - * errors in the execution of the user-provided callbacks. - * - * @param error Exception found during the execution of the user-provided callbacks + * Returns true if the member has a target assignment being processed. */ -public void onAssignmentProcessFailure(Throwable error) { -transitionTo(MemberState.FAILED); -// TODO: update member info appropriately, to clear up whatever shouldn't be kept in -// this unrecoverable state +private boolean hasPendingTargetAssignment() { +return targetAssignment.isPresent(); } private void resetEpoch() { this.memberEpoch = 0; } +/** + * {@inheritDoc} + */ @Override public MemberState state() { return state; } +/** + * {@inheritDoc} + */ @Override public AssignorSelection assignorSelection() { return this.assignorSelection; } +/** + * {@inheritDoc} + */ @Override -public ConsumerGroupHeartbeatResponseData.Assignment assignment() { +public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { return this.currentAssignment; } + +/** + * Assignment that the member received from the server but hasn't completely processed yet. + */ // VisibleForTesting Optional targetAssignment() { return targetAssignment; } -// VisibleForTesting -Optional nextTargetAssignment() { -return nextTargetAssignment; -} - /** - * Set the current assignment for the member. This indicates that the reconciliation of the - * target assignment has been successfully completed. - * This will clear the {@link #targetAssignment}, and take on the - * {@link #nextTargetAssignment} if any. + * This indicates that the reconciliation of the target assignment has been successfully Review Comment: instead of "this indicates" can we say: "this is invoked when is successfully completed"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
philipnee commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1353015059 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } +/** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { +log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { -// Keep the latest next target assignment -nextTargetAssignment = Optional.of(newTargetAssignment); +transitionToFailed(); +throw new IllegalStateException("A target assignment pending to be reconciled already" + Review Comment: Can we say: "Unable to set target assignment because ... " the help user understanding the cause? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
rbaddam commented on PR #14504: URL: https://github.com/apache/kafka/pull/14504#issuecomment-1755866839 To address your concern about the specificity of the solution, I will create Interfaces/subclasses of KerberosPrincipalBuilder and SSLPrincipalBuilder to handle different types of principals and their configurations. [https://github.com/apache/kafka/pull/14491/files](https://github.com/apache/kafka/pull/14491/files) We can apply the finalized approach to all the versions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15574: -- Description: This task is to call the partition assignment reconciler from the heartbeat request manager, making sure to correctly query the state machine for the right actions. (was: When the group member's assignment changes and partitions are revoked and auto-commit is enabled, we need to ensure that the commit request manager is invoked to queue up the commits.) > Integrate partition assignment reconciliation with heartbeat request manager > > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task is to call the partition assignment reconciler from the heartbeat > request manager, making sure to correctly query the state machine for the > right actions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15574) Integrate partition assignment reconciliation with heartbeat request manager
Kirk True created KAFKA-15574: - Summary: Integrate partition assignment reconciliation with heartbeat request manager Key: KAFKA-15574 URL: https://issues.apache.org/jira/browse/KAFKA-15574 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True When the group member's assignment changes and partitions are revoked and auto-commit is enabled, we need to ensure that the commit request manager is invoked to queue up the commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15573) Implement auto-commit on partition assignment revocation
Kirk True created KAFKA-15573: - Summary: Implement auto-commit on partition assignment revocation Key: KAFKA-15573 URL: https://issues.apache.org/jira/browse/KAFKA-15573 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Provide the Java client support for the consumer group partition assignment logic, including: * Calculate the difference between the current partition assignment and that returned in the {{ConsumerGroupHeartbeatResponse}} RPC response * Ensure we handle the case where changes to the assignment take multiple passes of {{RequestManager.poll()}} * Integrate the mechanism to invoke the user’s rebalance callback This task is part of the work to implement support for the new KIP-848 consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15573) Implement auto-commit on partition assignment revocation
[ https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15573: -- Description: When the group member's assignment changes and partitions are revoked and auto-commit is enabled, we need to ensure that the commit request manager is invoked to queue up the commits. (was: Provide the Java client support for the consumer group partition assignment logic, including: * Calculate the difference between the current partition assignment and that returned in the {{ConsumerGroupHeartbeatResponse}} RPC response * Ensure we handle the case where changes to the assignment take multiple passes of {{RequestManager.poll()}} * Integrate the mechanism to invoke the user’s rebalance callback This task is part of the work to implement support for the new KIP-848 consumer group protocol.) > Implement auto-commit on partition assignment revocation > > > Key: KAFKA-15573 > URL: https://issues.apache.org/jira/browse/KAFKA-15573 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > When the group member's assignment changes and partitions are revoked and > auto-commit is enabled, we need to ensure that the commit request manager is > invoked to queue up the commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
philipnee commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1352882295 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -21,53 +21,76 @@ import java.util.Optional; /** - * Manages group membership for a single member. + * Manages membership of a single member to a consumer group. + * * Responsible for: * Keeping member state * Keeping assignment for the member * Computing assignment for the group if the member is required to do so */ public interface MembershipManager { +/** + * ID of the consumer group the member is part of (or wants to be part of). + */ String groupId(); +/** + * Instance ID used by the member when joining the group. If non-empty, it will indicate that + * this is a static member. + */ Optional groupInstanceId(); +/** + * Member ID assigned by the server to this member when it joins the consumer group. + */ String memberId(); +/** + * Current epoch of the member, maintained by the server. + */ int memberEpoch(); +/** + * Current state of this member a part of the consumer group, as defined in {@link MemberState}. Review Comment: Maybe "the current state of the consumer" because it might not be in a group right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1352879009 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java: ## @@ -16,13 +16,48 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY; + /** * {@code PollResult} consist of {@code UnsentRequest} if there are requests to send; otherwise, return the time till * the next poll event. */ public interface RequestManager { + +/** + * During normal operation of the {@link Consumer}, a request manager may need to send out network requests. + * Implementations can return {@link PollResult their need for network I/O} by returning the requests here. + * Because the {@code poll} method is called within the single-threaded context of the consumer's main network + * I/O thread, there should be no need for synchronization protection within itself or other state. + * + * + * + * Note: no network I/O occurs in this method. The method itself should not block on I/O or for any + * other reason. This method is called from by the consumer's main network I/O thread. So quick execution of Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1352876631 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.internals.IdempotentCloser; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.closeQuietly; + +/** + * Background thread runnable that consumes {@link ApplicationEvent} and produces {@link BackgroundEvent}. It + * uses an event loop to consume and produce events, and poll the network client to handle network IO. + */ +public class ConsumerNetworkThread extends KafkaThread implements Closeable { + +private static final long MAX_POLL_TIMEOUT_MS = 5000; +private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread"; +private final Time time; +private final Logger log; +private final Supplier applicationEventProcessorSupplier; +private final Supplier networkClientDelegateSupplier; +private final Supplier requestManagersSupplier; +private ApplicationEventProcessor applicationEventProcessor; +private NetworkClientDelegate networkClientDelegate; +private RequestManagers requestManagers; +private volatile boolean running; +private final IdempotentCloser closer = new IdempotentCloser(); + +public ConsumerNetworkThread(LogContext logContext, + Time time, + Supplier applicationEventProcessorSupplier, + Supplier networkClientDelegateSupplier, + Supplier requestManagersSupplier) { +super(BACKGROUND_THREAD_NAME, true); +this.time = time; +this.log = logContext.logger(getClass()); +this.applicationEventProcessorSupplier = applicationEventProcessorSupplier; +this.networkClientDelegateSupplier = networkClientDelegateSupplier; +this.requestManagersSupplier = requestManagersSupplier; +} + +@Override +public void run() { +closer.assertOpen("Consumer network thread is already closed"); +running = true; + +try { +log.debug("Consumer network thread started"); + +// Wait until we're securely in the background network thread to initialize these objects... +initializeResources(); + +while (running) { +try { +runOnce(); +} catch (final WakeupException e) { +log.debug("WakeupException caught, consumer network thread won't be interrupted"); +// swallow the wakeup exception to prevent killing the thread. +} +} +} catch (final Throwable t) { +log.error("The consumer network thread failed due to unexpected error", t); +throw new KafkaException(t); +} +} + +void initializeResources() { +applicationEventProcessor = applicationEventProcessorSupplier.get(); +networkClientDelegate = networkClientDelegateSupplier.get(); +requestM
Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
philipnee commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1352876691 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -21,53 +21,76 @@ import java.util.Optional; /** - * Manages group membership for a single member. + * Manages membership of a single member to a consumer group. Review Comment: Can we say something like "A stateful object tracking the state of the consumer including: "? Membership might mean a lot of different things for different people. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
lianetm commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1352875373 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -19,56 +19,91 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; import java.util.Optional; /** - * Membership manager that maintains group membership for a single member following the new + * Membership manager that maintains group membership for a single member, following the new * consumer group protocol. * - * This keeps membership state and assignment updated in-memory, based on the heartbeat responses - * the member receives. It is also responsible for computing assignment for the group based on - * the metadata, if the member has been selected by the broker to do so. + * This is responsible for: + * Keeping member info (ex. member id, member epoch, assignment, etc.) + * Keeping member state as defined in {@link MemberState}. + * + * Member info and state are updated based on the heartbeat responses the member receives. */ public class MembershipManagerImpl implements MembershipManager { +/** + * ID of the consumer group the member will be part of., provided when creating the current + * membership manager. + */ private final String groupId; + +/** + * Group instance ID to be used by the member, provided when creating the current membership manager. + */ private final Optional groupInstanceId; + +/** + * Member ID assigned by the server to the member, received in a heartbeat response when + * joining the group specified in {@link #groupId} + */ private String memberId; + +/** + * Current epoch of the member. It will be set to 0 by the member, and provided to the server + * on the heartbeat request, to join the group. It will be then maintained by the server, + * incremented as the member reconciles and acknowledges the assignments it receives. + */ private int memberEpoch; + +/** + * Current state of this member a part of the consumer group, as defined in {@link MemberState} + */ private MemberState state; + +/** + * Assignor type selection for the member. If non-null, the member will send its selection to + * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the server will select a + * default assignor for the member, which the member does not need to track. + */ private AssignorSelection assignorSelection; Review Comment: As described in the field doc, this changed to always having a selection, that will default to using server-side assignor, and let the server choose the specific implementation to use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1352874538 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.LinkedList; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +public class BackgroundEventProcessor { + +private final Logger log; +private final BlockingQueue backgroundEventQueue; + +public BackgroundEventProcessor(final LogContext logContext, +final BlockingQueue backgroundEventQueue) { +this.log = logContext.logger(BackgroundEventProcessor.class); +this.backgroundEventQueue = backgroundEventQueue; +} + +/** + * Drains all available {@link BackgroundEvent}s, and then processes them in order. If any + * errors are thrown as a result of a {@link ErrorBackgroundEvent} or an error occurs while processing + * another type of {@link BackgroundEvent}, only the first exception will be thrown, all + * subsequent errors will simply be logged at WARN level. + * + * @throws RuntimeException or subclass + */ +public void process() { +LinkedList events = new LinkedList<>(); +backgroundEventQueue.drainTo(events); + +RuntimeException first = null; +int errorCount = 0; + +for (BackgroundEvent event : events) { +log.debug("Consuming background event: {}", event); + +try { +process(event); +} catch (RuntimeException e) { +errorCount++; + +if (first == null) { +first = e; +log.warn("Error #{} from background thread (will be logged and thrown): {}", errorCount, e.getMessage(), e); Review Comment: Removed check to avoid. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -16,197 +16,78 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ApiVersions; -import org.apache.kafka.clients.ClientUtils; -import org.apache.kafka.clients.GroupRebalanceConfig; -import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; +import java.io.Closeable; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.BlockingQueue; - -import static java.util.Objects.requireNonNull; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel; +import java.util.function.Supplier; /** * Background thread runnable that consumes {@code ApplicationEvent} and * produces {@code BackgroundEvent}. It uses an event loop to consume and * produce events, and poll the network client to handle network IO. - * + * * It holds a ref
[PR] Update HEADER [kafka]
adikarthik opened a new pull request, #14520: URL: https://github.com/apache/kafka/pull/14520 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]
CalvinConfluent commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1352848836 ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => + val cachedBrokerEpoch = if (metadataCache.isInstanceOf[KRaftMetadataCache]) + metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) else Option(-1L) + // Fence the update if it provides a stale broker epoch. + if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { Review Comment: Maybe allowing the updates with higher broker epochs has one problem. Is it possible there is a malfunctioning/bug broker to fetch with a much higher broker epoch, then the leader has to restart to get out of the state. Other than this, I don't see a problem to allow higher broker epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]
CalvinConfluent commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1352848836 ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => + val cachedBrokerEpoch = if (metadataCache.isInstanceOf[KRaftMetadataCache]) + metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) else Option(-1L) + // Fence the update if it provides a stale broker epoch. + if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { Review Comment: Maybe allowing the updates with higher broker epochs has one problem. Is it possible there is a malfunctioning/bug broker to fetch with a crazy broker epoch, then the leader has to restart to get out of the state. Other than this, I don't see a problem to allow higher broker epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
jeffkbkim commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1352815096 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -898,6 +900,46 @@ public void createGroupTombstoneRecords(List records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } +@Override +public boolean isEmpty() { +return isInState(EMPTY); +} + +/** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. + */ +@Override +public Optional offsetExpirationCondition() { +if (protocolType.isPresent()) { +if (isInState(EMPTY)) { +// No consumer exists in the group => +// - If current state timestamp exists and retention period has passed since group became Empty, +// expire all offsets with no pending offset commit; +// - If there is no current state timestamp (old group metadata schema) and retention period has passed +// since the last commit timestamp, expire the offset +return Optional.of(new OffsetExpirationConditionImpl( +offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))); +} else if (usesConsumerGroupProtocol() && subscribedTopics.isPresent() && isInState(STABLE)) { +// Consumers exist in the group and group is Stable => +// - If the group is aware of the subscribed topics and retention period had passed since the +// last commit timestamp, expire the offset. offset with pending offset commit are not +// expired +return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)); Review Comment: no, commitTimestampMs is a field and not a method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
clolov commented on PR #14290: URL: https://github.com/apache/kafka/pull/14290#issuecomment-1755706472 I am putting myself as a reviewer because I would like to keep up to date with these changes. I will aim to provide my review tomorrow! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
lianetm commented on PR #14413: URL: https://github.com/apache/kafka/pull/14413#issuecomment-1755659746 Hey @dajac , this ready for review now, including trunk latest changes. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
lianetm opened a new pull request, #14413: URL: https://github.com/apache/kafka/pull/14413 This PR includes: - changes for error handling, leaving responsibility in the heartbeatManager and exposing only the functionality for when the state needs to be updated (on successful HB, on fencing, on fatal failure) - simplified assignment handling on the assumption that the members will handle one assignment at a time - allow transitions for failures when joining - remove default assignor logic (left to the server) - tests & minor fixes addressing initial version review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements [kafka]
lianetm closed pull request #14413: KAFKA-14323: Client state changes for handling one assignment at a time & minor improvements URL: https://github.com/apache/kafka/pull/14413 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs
[ https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucian Ilie updated KAFKA-15572: Affects Version/s: 3.0.0 (was: 3.3.1) (was: 3.4.1) > Race condition between future log dir roll and replace current with future > log during alterReplicaLogDirs > - > > Key: KAFKA-15572 > URL: https://issues.apache.org/jira/browse/KAFKA-15572 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0 >Reporter: Lucian Ilie >Priority: Major > Attachments: kafka-alter-log-dir-nosuchfileexception.log > > Original Estimate: 48h > Remaining Estimate: 48h > > We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, > using banzaicloud/koperator. > We have multiple disks per broker. > We are using Cruise Control remove disk operation in order to aggregate > multiple smaller disks into a single bigger disk. > When we do this, *the flush operation fails apparently randomly with > NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a > sample of logs for the exception and the previous operations taking place. > Will further detail the cause of this issue. > Say we have 3 brokers: > * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger > disk /new-kafka-logs1/kafka > * broker 201 with same disks > * broker 301 with same disks > When Cruise Control executes a remove disk operation, it calls Kafka > "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment > as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to > /new-kafka-logs1/kafka. > During the alter log dir operation, future logs are created (to move data > from e.g. "/kafka-logs1/kafka/topic-partition" to > "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and > finally the log dir will be renamed from > "/new-kafka-logs1/kafka/topic-partition.hash-future" to > "/new-kafka-logs1/kafka/topic-partition". This operation is started in > [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] > and is locked using the [UnifiedLog > lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The rename is then delegated to > [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. > This is the 1st code part that is involved in the race condition. > Meanwhile, log dirs can be rolled based on known conditions (e.g. getting > full), which will call > [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], > which is locked using the [UnifiedLog > lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is > not sharing that lock, since it is [done as a scheduled task in a separate > thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547]. > This means that further operations are [not locked at UnifiedLog > level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The operation is further delegated to > [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], > which will also try to [flush the log > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > This is the 2nd code part that is involved in the race condition. > Since the log dir flush does not share the lock with the rename dir operation > (as it is scheduled via the scheduler), the rename dir operation might > succeed in moving the log dir on disk to "topic-partition", but the > LocalLog._dir will remain set to "topic-partition.hash-future", and when the > flush will attempt to flush the "topic-partition.hash-future" directory, it > will throw NoSuchFileException: "topic-partition.hash-future". Basically, > [this > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > might succeed, and before [this other > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > is executed, flush tries to [flush the future > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved > the issue by synchronizing the flush dir operation. Will reply with a link to > a PR. > Note that th
[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs
[ https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucian Ilie updated KAFKA-15572: Description: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to /new-kafka-logs1/kafka. During the alter log dir operation, future logs are created (to move data from e.g. "/kafka-logs1/kafka/topic-partition" to "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and finally the log dir will be renamed from "/new-kafka-logs1/kafka/topic-partition.hash-future" to "/new-kafka-logs1/kafka/topic-partition". This operation is started in [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] and is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The rename is then delegated to [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. This is the 1st code part that is involved in the race condition. Meanwhile, log dirs can be rolled based on known conditions (e.g. getting full), which will call [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], which is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is [done as a scheduled task in a separate thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547]. This means that further operations are [not locked at UnifiedLog level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The operation is further delegated to [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], which will also try to [flush the log dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. This is the 2nd code part that is involved in the race condition. Since the log dir flush does not share the lock with the rename dir operation (as it is scheduled via the scheduler), the rename dir operation might succeed in moving the log dir on disk to "topic-partition", but the LocalLog._dir will remain set to "topic-partition.hash-future", and when the flush will attempt to flush the "topic-partition.hash-future" directory, it will throw NoSuchFileException: "topic-partition.hash-future". Basically, [this line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] might succeed, and before [this other line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] is executed, flush tries to [flush the future dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the issue by synchronizing the flush dir operation. Will reply with a link to a PR. Note that this bug replicates for every version since 3.0.0, caused by [this commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] when flush dir was added. was: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with
[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs
[ https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucian Ilie updated KAFKA-15572: Attachment: kafka-alter-log-dir-nosuchfileexception.log > Race condition between future log dir roll and replace current with future > log during alterReplicaLogDirs > - > > Key: KAFKA-15572 > URL: https://issues.apache.org/jira/browse/KAFKA-15572 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.1, 3.4.1 >Reporter: Lucian Ilie >Priority: Major > Attachments: kafka-alter-log-dir-nosuchfileexception.log > > Original Estimate: 48h > Remaining Estimate: 48h > > We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, > using banzaicloud/koperator. > We have multiple disks per broker. > We are using Cruise Control remove disk operation in order to aggregate > multiple smaller disks into a single bigger disk. > When we do this, *the flush operation fails apparently randomly with > NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a > sample of logs for the exception and the previous operations taking place. > Will further detail the cause of this issue. > Say we have 3 brokers: > * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger > disk /new-kafka-logs1/kafka > * broker 201 with same disks > * broker 301 with same disks > When Cruise Control executes a remove disk operation, it calls Kafka > "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment > as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to > /new-kafka-logs1/kafka. > During the alter log dir operation, future logs are created (to move data > from e.g. "/kafka-logs1/kafka/topic-partition" to > "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and > finally the log dir will be renamed from > "/new-kafka-logs1/kafka/topic-partition.hash-future" to > "/new-kafka-logs1/kafka/topic-partition". This operation is started in > [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] > and is locked using the [UnifiedLog > lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The rename is then delegated to > [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. > This is the 1st code part that is involved in the race condition. > Meanwhile, log dirs can be rolled based on known conditions (e.g. getting > full), which will call > [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], > which is locked using the [UnifiedLog > lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is > not sharing that lock, since it is done as a scheduled task in a separate > thread. This means that further operations are [not locked at UnifiedLog > level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The operation is further delegated to > [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], > which will also try to [flush the log > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > This is the 2nd code part that is involved in the race condition. > Since the log dir flush does not share the lock with the rename dir operation > (as it is scheduled via the scheduler), the rename dir operation might > succeed in moving the log dir on disk to "topic-partition", but the > LocalLog._dir will remain set to "topic-partition.hash-future", and when the > flush will attempt to flush the "topic-partition.hash-future" directory, it > will throw NoSuchFileException: "topic-partition.hash-future". Basically, > [this > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > might succeed, and before [this other > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > is executed, flush tries to [flush the future > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved > the issue by synchronizing the flush dir operation. Will reply with a link to > a PR. > Note that this bug replicates for every version since 3.0.0, caused by [this > commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d
[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs
[ https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucian Ilie updated KAFKA-15572: Description: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to /new-kafka-logs1/kafka. During the alter log dir operation, future logs are created (to move data from e.g. "/kafka-logs1/kafka/topic-partition" to "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and finally the log dir will be renamed from "/new-kafka-logs1/kafka/topic-partition.hash-future" to "/new-kafka-logs1/kafka/topic-partition". This operation is started in [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] and is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The rename is then delegated to [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. This is the 1st code part that is involved in the race condition. Meanwhile, log dirs can be rolled based on known conditions (e.g. getting full), which will call [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], which is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is done as a scheduled task in a separate thread. This means that further operations are [not locked at UnifiedLog level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The operation is further delegated to [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], which will also try to [flush the log dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. This is the 2nd code part that is involved in the race condition. Since the log dir flush does not share the lock with the rename dir operation (as it is scheduled via the scheduler), the rename dir operation might succeed in moving the log dir on disk to "topic-partition", but the LocalLog._dir will remain set to "topic-partition.hash-future", and when the flush will attempt to flush the "topic-partition.hash-future" directory, it will throw NoSuchFileException: "topic-partition.hash-future". Basically, [this line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] might succeed, and before [this other line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] is executed, flush tries to [flush the future dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the issue by synchronizing the flush dir operation. Will reply with a link to a PR. Note that this bug replicates for every version since 3.0.0, caused by [this commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] when flush dir was added. was: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker
[jira] [Updated] (KAFKA-15572) Race condition between future log dir roll and replace current with future log during alterReplicaLogDirs
[ https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucian Ilie updated KAFKA-15572: Description: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to /new-kafka-logs1/kafka. During the alter log dir operation, future logs are created (to move data from e.g. "/kafka-logs1/kafka/topic-partition" to "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and finally the log dir will be renamed from "/new-kafka-logs1/kafka/topic-partition.hash-future" to "/new-kafka-logs1/kafka/topic-partition". This operation is started in [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] and is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The rename is then delegated to [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. This is the 1st code part that is involved in the race condition. Meanwhile, log dirs can be rolled based on known conditions (e.g. getting full), which will call [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], which is locked using the UnifiedLog lock. However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is done as a scheduled task in a separate thread. This means that further operations are [not locked at UnifiedLog level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The operation is further delegated to [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], which will also try to [flush the log dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. This is the 2nd code part that is involved in the race condition. Since the log dir flush does not share the lock with the rename dir operation, the rename dir operation might succeed in moving the log dir on disk to "topic-partition", but the LocalLog._dir will remain set to "topic-partition.hash-future", and when the flush will attempt to flush the "topic-partition.hash-future" directory, it will throw NoSuchFileException: "topic-partition.hash-future". Basically, [this line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] might succeed, and before [this other line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] is executed, flush tries to [flush the future dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the issue by synchronizing the flush dir operation. Will reply with a link to a PR. Note that this bug replicates for every version since 3.0.0, caused by [this commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] when flush dir was added. was: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.a