Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]
frankvicky commented on PR #16319: URL: https://github.com/apache/kafka/pull/16319#issuecomment-2164695371 Hi @showuon, I have refactor the test case by utilizing `@CsvSource`, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (part 1) module to JUnit 5 [kafka]
FrankYang0529 commented on PR #16254: URL: https://github.com/apache/kafka/pull/16254#issuecomment-2164633695 Hi @chia7712, I addressed all comments. May you take a look? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (part 1) module to JUnit 5 [kafka]
FrankYang0529 commented on code in PR #16254: URL: https://github.com/apache/kafka/pull/16254#discussion_r1637632275 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java: ## @@ -87,48 +85,45 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; -@RunWith(Parameterized.class) public class KStreamSlidingWindowAggregateTest { - -@Parameterized.Parameters(name = "{0}_inorder:{1}_cache:{2}") -public static Collection data() { -return Arrays.asList(new Object[][] { -{StrategyType.ON_WINDOW_UPDATE, true, true}, -{StrategyType.ON_WINDOW_UPDATE, true, false}, -{StrategyType.ON_WINDOW_UPDATE, false, true}, -{StrategyType.ON_WINDOW_UPDATE, false, false}, -{StrategyType.ON_WINDOW_CLOSE, true, true}, -{StrategyType.ON_WINDOW_CLOSE, true, false}, -{StrategyType.ON_WINDOW_CLOSE, false, true}, -{StrategyType.ON_WINDOW_CLOSE, false, false} -}); + +public static Stream data() { +return Stream.of( +Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, true), +Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, false), +Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, true), +Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, false), +Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, true), +Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, false), +Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, true), +Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, false) +); } -@Parameter public StrategyType type; - -@Parameter(1) public boolean inOrderIterator; - -@Parameter(2) public boolean withCache; private boolean emitFinal; private EmitStrategy emitStrategy; private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); - -@Before -public void before() { + +public void setup(final StrategyType inputType, final boolean inputInOrderIterator, final boolean inputWithCache) { +type = inputType; +inOrderIterator = inputInOrderIterator; +withCache = inputWithCache; emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE); emitStrategy = StrategyType.forType(type); } -@Test -public void testAggregateSmallInput() { +@ParameterizedTest +@MethodSource("data") Review Comment: This one can't just use `@EnumSource(EmitStrategy.StrategyType.class)`, because there are another two arguments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16901) add unit tests for ConsumerRecords#records(String)
[ https://issues.apache.org/jira/browse/KAFKA-16901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16901. Fix Version/s: 3.9.0 Resolution: Fixed > add unit tests for ConsumerRecords#records(String) > -- > > Key: KAFKA-16901 > URL: https://issues.apache.org/jira/browse/KAFKA-16901 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.9.0 > > > ConsumerRecords#records(String) is a public API, so it is worthy to have unit > test :) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 merged PR #16227: URL: https://github.com/apache/kafka/pull/16227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Apply spotless to `group-coordinator` and `group-coordinator-api` [kafka]
gongxuanzhang commented on PR #16298: URL: https://github.com/apache/kafka/pull/16298#issuecomment-2164609081 > @gongxuanzhang please add `spotlessJava.dependsOn 'processMessages'` to the module having `processMessages` task. Otherwise, spotless may ignore those generated code I updated it , let’s wait CI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improving log for outstanding requests on close and cleanup [kafka]
chia7712 merged PR #16304: URL: https://github.com/apache/kafka/pull/16304 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16803: Update ShadowJavaPlugin [kafka]
Nancy-ksolves commented on code in PR #16295: URL: https://github.com/apache/kafka/pull/16295#discussion_r1637621152 ## build.gradle: ## @@ -46,7 +46,7 @@ plugins { id 'org.scoverage' version '8.0.3' apply false // Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed // artifacts - see https://github.com/johnrengelman/shadow/issues/901 Review Comment: The Comment has been removed as suggested -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
dengziming commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1637473020 ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } +public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { +List newRecords = new ArrayList<>(); +for (ApiMessageAndVersion record : records) { +if (recordToMetadataVersion(record.message()).isPresent()) { +newRecords.add(metadataVersionToRecord(metadataVersion)); +} else { +newRecords.add(record); +} +} +return new BootstrapMetadata(newRecords, metadataVersion, source); +} + +public BootstrapMetadata withKRaftVersion(KRaftVersion version) { +List newRecords = new ArrayList<>(); +boolean foundKRaftVersion = false; +for (ApiMessageAndVersion record : records) { +if (recordToKRaftVersion(record.message()).isPresent()) { +newRecords.add(kraftVersionToRecord(version)); +} else { +newRecords.add(record); +} +} +if (!foundKRaftVersion) { +newRecords.add(kraftVersionToRecord(version)); Review Comment: We have decided to propagate feature version for kraft.version using KRaftVersionRecord control record, should we also add a `ControlRecord` to make it consistent with `FeatureLevelRecord` ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } +public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { +List newRecords = new ArrayList<>(); +for (ApiMessageAndVersion record : records) { +if (recordToMetadataVersion(record.message()).isPresent()) { +newRecords.add(metadataVersionToRecord(metadataVersion)); +} else { +newRecords.add(record); +} +} +return new BootstrapMetadata(newRecords, metadataVersion, source); +} + +public BootstrapMetadata withKRaftVersion(KRaftVersion version) { +List newRecords = new ArrayList<>(); +boolean foundKRaftVersion = false; +for (ApiMessageAndVersion record : records) { +if (recordToKRaftVersion(record.message()).isPresent()) { +newRecords.add(kraftVersionToRecord(version)); Review Comment: We should add: `foundKRaftVersion=true;` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]
frankvicky commented on code in PR #16319: URL: https://github.com/apache/kafka/pull/16319#discussion_r1637598927 ## clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java: ## @@ -113,16 +113,26 @@ public void testGetHost() { assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080")); assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080")); assertEquals("My_Domain.com", getHost("PLAINTEXT://My_Domain.com:8080")); +assertEquals("mydomain.com", getHost("SASL_PLAINTEXT://mydomain.com:8080")); +assertEquals("MyDomain.com", getHost("SASL_PLAINTEXT://MyDomain.com:8080")); +assertEquals("My_Domain.com", getHost("SASL_PLAINTEXT://My_Domain.com:8080")); assertEquals("::1", getHost("[::1]:1234")); assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678")); assertEquals("fe80::b1da:69ca:57f7:63d8%3", getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678")); +assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("SASL_PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); Review Comment: I see, I will refactor it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16892: fix TopicsDelta generated from snapshot #localChanges return empty deletes [kafka]
superhx commented on PR #16206: URL: https://github.com/apache/kafka/pull/16206#issuecomment-2164505170 @showuon Could you review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]
showuon commented on code in PR #16319: URL: https://github.com/apache/kafka/pull/16319#discussion_r1637590625 ## clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java: ## @@ -113,16 +113,26 @@ public void testGetHost() { assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080")); assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080")); assertEquals("My_Domain.com", getHost("PLAINTEXT://My_Domain.com:8080")); +assertEquals("mydomain.com", getHost("SASL_PLAINTEXT://mydomain.com:8080")); +assertEquals("MyDomain.com", getHost("SASL_PLAINTEXT://MyDomain.com:8080")); +assertEquals("My_Domain.com", getHost("SASL_PLAINTEXT://My_Domain.com:8080")); assertEquals("::1", getHost("[::1]:1234")); assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678")); assertEquals("fe80::b1da:69ca:57f7:63d8%3", getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678")); +assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("SASL_PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); Review Comment: @frankvicky , we have 4 kinds of security protocol, let's test all of them to make sure we don't break anything. I think we can use `ParameterizedTest` here: ``` @ParameterizedTest @CsvSource(value = {"PLANTEXT", "SASL_PLAINTEXT", ...) ``` ref: https://kafka.apache.org/documentation/#brokerconfigs_security.inter.broker.protocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]
showuon commented on code in PR #16319: URL: https://github.com/apache/kafka/pull/16319#discussion_r1637590625 ## clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java: ## @@ -113,16 +113,26 @@ public void testGetHost() { assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080")); assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080")); assertEquals("My_Domain.com", getHost("PLAINTEXT://My_Domain.com:8080")); +assertEquals("mydomain.com", getHost("SASL_PLAINTEXT://mydomain.com:8080")); +assertEquals("MyDomain.com", getHost("SASL_PLAINTEXT://MyDomain.com:8080")); +assertEquals("My_Domain.com", getHost("SASL_PLAINTEXT://My_Domain.com:8080")); assertEquals("::1", getHost("[::1]:1234")); assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678")); assertEquals("fe80::b1da:69ca:57f7:63d8%3", getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678")); +assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("SASL_PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); Review Comment: @frankvicky , we have 4 kinds of security protocol, let's test all of them. I think we can use `ParameterizedTest` here: ``` @ParameterizedTest @CsvSource(value = {"PLANTEXT", "SASL_PLAINTEXT", ...) ``` ref: https://kafka.apache.org/documentation/#brokerconfigs_security.inter.broker.protocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]
adixitconfluent commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1637579381 ## core/src/main/java/kafka/server/SharePartition.java: ## @@ -310,126 +485,361 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } +private AcquiredRecords acquireNewBatchRecords( +String memberId, +long firstOffset, +long lastOffset +) { +lock.writeLock().lock(); +try { +// Schedule acquisition lock timeout for the batch. +AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); +// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. +cachedState.put(firstOffset, new InFlightBatch( +memberId, +firstOffset, +lastOffset, +RecordState.ACQUIRED, +1, +timerTask)); +// if the cachedState was empty before acquiring the new batches then startOffset needs to be updated +if (cachedState.firstKey() == firstOffset) { +startOffset = firstOffset; +} +endOffset = lastOffset; +return new AcquiredRecords() +.setFirstOffset(firstOffset) +.setLastOffset(lastOffset) +.setDeliveryCount((short) 1); +} finally { +lock.writeLock().unlock(); +} +} + +private void acquireSubsetBatchRecords( +String memberId, +long requestFirstOffset, +long requestLastOffset, +InFlightBatch inFlightBatch, +List result +) { +lock.writeLock().lock(); +try { +for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) { +// For the first batch which might have offsets prior to the request base +// offset i.e. cached batch of 10-14 offsets and request batch of 12-13. +if (offsetState.getKey() < requestFirstOffset) { +continue; +} + +if (offsetState.getKey() > requestLastOffset) { +// No further offsets to process. +break; +} + +if (offsetState.getValue().state != RecordState.AVAILABLE) { +log.trace("The offset is not available skipping, offset: {} batch: {}" ++ " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, +groupId, topicIdPartition); +continue; +} + +InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, +memberId); +if (updateResult == null) { +log.trace("Unable to acquire records for the offset: {} in batch: {}" ++ " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, +groupId, topicIdPartition); +continue; +} +// Schedule acquisition lock timeout for the offset. +AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); +// Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + +// TODO: Maybe we can club the continuous offsets here. +result.add(new AcquiredRecords() +.setFirstOffset(offsetState.getKey()) +.setLastOffset(offsetState.getKey()) +.setDeliveryCount((short) offsetState.getValue().deliveryCount)); +} +} finally { +lock.writeLock().unlock(); +} +} + /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Check if the in-flight batch is a full match with the request offsets. The full match represents + * complete overlap of the in-flight batch with the request offsets. + * + * @param inFlightBatch The in-flight batch to check for full match. + * @param firstOffsetToCompare The first offset of the request batch. + * @param lastOffsetToCompare The last offset of the request batch. + * + * @return True if the in-flight batch is a full match with the request offsets, false otherwise. */ -private static class InFlightBatch { -/** - * The offset of the first record in the batch that is fetched from the log. - */ +private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetT
Re: [PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]
adixitconfluent commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1637574165 ## core/src/main/java/kafka/server/SharePartition.java: ## @@ -54,6 +60,7 @@ public class SharePartition { * The state of the records determines if the records should be re-delivered, move the next fetch * offset, or be state persisted to disk. */ +// Visible for testing public enum RecordState { Review Comment: Can we make it `default` instead of `public` ## core/src/main/java/kafka/server/SharePartition.java: ## @@ -310,126 +485,361 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } +private AcquiredRecords acquireNewBatchRecords( +String memberId, +long firstOffset, +long lastOffset +) { +lock.writeLock().lock(); +try { +// Schedule acquisition lock timeout for the batch. +AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); +// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. +cachedState.put(firstOffset, new InFlightBatch( +memberId, +firstOffset, +lastOffset, +RecordState.ACQUIRED, +1, +timerTask)); +// if the cachedState was empty before acquiring the new batches then startOffset needs to be updated +if (cachedState.firstKey() == firstOffset) { +startOffset = firstOffset; +} +endOffset = lastOffset; +return new AcquiredRecords() +.setFirstOffset(firstOffset) +.setLastOffset(lastOffset) +.setDeliveryCount((short) 1); +} finally { +lock.writeLock().unlock(); +} +} + +private void acquireSubsetBatchRecords( +String memberId, +long requestFirstOffset, +long requestLastOffset, +InFlightBatch inFlightBatch, +List result +) { +lock.writeLock().lock(); +try { +for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) { +// For the first batch which might have offsets prior to the request base +// offset i.e. cached batch of 10-14 offsets and request batch of 12-13. +if (offsetState.getKey() < requestFirstOffset) { +continue; +} + +if (offsetState.getKey() > requestLastOffset) { +// No further offsets to process. +break; +} + +if (offsetState.getValue().state != RecordState.AVAILABLE) { +log.trace("The offset is not available skipping, offset: {} batch: {}" ++ " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, +groupId, topicIdPartition); +continue; +} + +InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, +memberId); +if (updateResult == null) { +log.trace("Unable to acquire records for the offset: {} in batch: {}" ++ " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, +groupId, topicIdPartition); +continue; +} +// Schedule acquisition lock timeout for the offset. +AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); +// Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + +// TODO: Maybe we can club the continuous offsets here. +result.add(new AcquiredRecords() +.setFirstOffset(offsetState.getKey()) +.setLastOffset(offsetState.getKey()) +.setDeliveryCount((short) offsetState.getValue().deliveryCount)); +} +} finally { +lock.writeLock().unlock(); +} +} + /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Check if the in-flight batch is a full match with the request offsets. The full match represents + * complete overlap of the in-flight batch with the request offsets. + * + * @param inFlightBatch The in-flight batch to check for full match. + * @param firstOffsetToCompare The first offset of the request batch. + * @param lastOffset
Re: [PR] KAFKA-16948: Reset tier lag metrics on becoming follower [kafka]
kamalcph commented on PR #16321: URL: https://github.com/apache/kafka/pull/16321#issuecomment-2164410837 @dopuskh3 Call for review. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16948: Reset tier lag metrics on becoming follower [kafka]
kamalcph opened a new pull request, #16321: URL: https://github.com/apache/kafka/pull/16321 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16948) Reset tier lag metrics on becoming follower
Kamal Chandraprakash created KAFKA-16948: Summary: Reset tier lag metrics on becoming follower Key: KAFKA-16948 URL: https://issues.apache.org/jira/browse/KAFKA-16948 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash Assignee: Kamal Chandraprakash Tier lag metrics such as remoteCopyLagBytes and remoteCopyLagSegments are not cleared sometimes when the node transitions from leader to follower post a rolling restart. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16747: Implemented share sessions and contexts for share fetch requests [kafka]
adixitconfluent commented on code in PR #16263: URL: https://github.com/apache/kafka/pull/16263#discussion_r1637556905 ## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ## @@ -0,0 +1,916 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.ReplicaManager; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.InvalidShareSessionEpochException; +import org.apache.kafka.common.errors.ShareSessionNotFoundException; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.requests.ShareFetchMetadata; +import org.apache.kafka.common.requests.ShareFetchRequest; +import org.apache.kafka.common.requests.ShareFetchResponse; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.share.ShareSessionCache; +import org.apache.kafka.server.share.ShareSessionKey; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +@Timeout(120) +public class SharePartitionManagerTest { + +private static final int RECORD_LOCK_DURATION_MS = 3; +private static final int MAX_DELIVERY_COUNT = 5; +private static final short MAX_IN_FLIGHT_MESSAGES = 200; + +private final List emptyPartList = Collections.unmodifiableList(new ArrayList<>()); + +@Test +public void testNewContextReturnsFinalContext() { +SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build(); + +ShareFetchMetadata newReqMetadata = new ShareFetchMetadata(Uuid.ZERO_UUID, -1); +ShareFetchContext shareFetchContext = sharePartitionManager.newContext("grp", new HashMap<>(), new ArrayList<>(), newReqMetadata); +assertEquals(shareFetchContext.getClass(), SharePartitionManager.FinalContext.class); + +// If the final fetch request has topics to add, it should fail as an invalid request +Uuid topicId = Uuid.randomUuid(); +Map shareFetchData = new HashMap<>(); +shareFetchData.put(new TopicIdPartition(topicId, new TopicPartition("foo", 0)), +new ShareFetchRequest.SharePartitionData(topicId, 4000)); +assertThrows(InvalidRequestException.class, +() -> sharePartitionManager.newContext("grp", shareFetchData, new ArrayList<>(), new ShareFetchMetadata(Uuid.ZERO_UUID, -1))); + +// shareFetchData is not empty, but the maxBytes of topic partition is 0, which means this is added only for acknowledgements. +// New context should be created successfully +shareFetchData.clear(); +shareFetchData.put(new TopicIdPartition(topicId, new TopicPartition("foo", 0)), +new ShareFetchRequest.SharePartitionData(topicId, 0)); +shareFetchContext = sharePartitionManager.newContext("grp", shareFetchData, new ArrayList<>(), newReqMetadata); +assertEquals(shareFetchContext.getClass(), SharePartitionManager.FinalContext.class); +} + +@Test +public void testNewContext() { Review Comment: Yes, I can only segregate out one of the cases from the following test because for others I need them to work one after th
Re: [PR] KAFKA-16747: Implemented share sessions and contexts for share fetch requests [kafka]
adixitconfluent commented on code in PR #16263: URL: https://github.com/apache/kafka/pull/16263#discussion_r1637549886 ## core/src/main/java/kafka/server/share/SharePartitionManager.java: ## @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.FetchSession; +import kafka.server.ReplicaManager; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ShareSessionNotFoundException; +import org.apache.kafka.common.message.ShareAcknowledgeResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ShareFetchMetadata; +import org.apache.kafka.common.requests.ShareFetchRequest; +import org.apache.kafka.common.requests.ShareFetchResponse; +import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.server.share.ShareSession; +import org.apache.kafka.server.share.ShareSessionCache; +import org.apache.kafka.server.share.ShareSessionKey; +import org.apache.kafka.storage.internals.log.FetchParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. + * It is responsible for fetching messages from the log and acknowledging the messages. + */ +public class SharePartitionManager implements AutoCloseable { + +private final static Logger log = LoggerFactory.getLogger(SharePartitionManager.class); + +/** + * The partition cache map is used to store the SharePartition objects for each share group topic-partition. + */ +private final Map partitionCacheMap; + +/** + * The replica manager is used to fetch messages from the log. + */ +private final ReplicaManager replicaManager; + +/** + * The time instance is used to get the current time. + */ +private final Time time; + +/** + * The share session cache stores the share sessions. + */ +private final ShareSessionCache cache; + +/** + * The fetch queue stores the share fetch requests that are waiting to be processed. + */ +private final ConcurrentLinkedQueue fetchQueue; + +/** + * The process fetch queue lock is used to ensure that only one thread is processing the fetch queue at a time. + */ +private final AtomicBoolean processFetchQueueLock; + +/** + * The record lock duration is the time in milliseconds that a record lock is held for. + */ +private final int recordLockDurationMs; + +/** + * The max in flight messages is the maximum number of messages that can be in flight at any one time per share-partition. + */ +private final int maxInFlightMessages; + +/** + * The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived. + */ +private final int maxDeliveryCount; + +public SharePartitionManager( +ReplicaManager replicaManager, +Time time, +ShareSessionCache cache, +int recordLockDurationMs, +int maxDeliveryCount, +int maxInFlightMessages +) { +this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages); +} + +SharePartitionManager( +ReplicaManager replicaManager, +Time time, +ShareSessionCache cache, +Map partitionCacheMap, +int recordLockDurationMs, +
Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]
xiaoqingwanga commented on PR #16303: URL: https://github.com/apache/kafka/pull/16303#issuecomment-2164374450 @gharris1727 Thank you very much for your suggestion! I have made another commit, resolving the issue mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 24) Correct stateful task flagging [kafka]
apourchet closed pull request #16314: KAFKA-15045: (KIP-924 pt. 24) Correct stateful task flagging URL: https://github.com/apache/kafka/pull/16314 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP] Delayed remote list offsets [kafka]
kamalcph opened a new pull request, #16320: URL: https://github.com/apache/kafka/pull/16320 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic [kafka]
C0urante merged PR #16306: URL: https://github.com/apache/kafka/pull/16306 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
github-actions[bot] commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-2164295049 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]
showuon commented on PR #16211: URL: https://github.com/apache/kafka/pull/16211#issuecomment-2164292176 @chia7712 , thanks for the review. PR updated in this commit: https://github.com/apache/kafka/pull/16211/commits/7db45c2e8f08b4a381717b4c6b7f466bd116cd4e . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]
showuon commented on code in PR #16211: URL: https://github.com/apache/kafka/pull/16211#discussion_r1637455856 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -126,8 +127,13 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { */ public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) { updateFetchedVoters(id); -// The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 -int majority = voterStates.size() / 2; +// The majority number of the voters. Ex: 3 voters, the value will be 2 +int majority = (int) ((double) (voterStates.size() + 1) / 2); Review Comment: Ah, good catch! My test didn't work as expected. Also updated the test to catch the issue. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16531: calculate check-quorum when leader is not in voter set [kafka]
showuon commented on code in PR #16211: URL: https://github.com/apache/kafka/pull/16211#discussion_r1637455387 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -111,9 +111,10 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { long remainingMs = checkQuorumTimer.remainingMs(); if (remainingMs == 0) { log.info( -"Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", +"Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}, and voters are {}", checkQuorumTimeoutMs, -fetchedVoters); +fetchedVoters, +voterStates); Review Comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12480) Reuse bootstrap servers in clients when last alive broker in cluster metadata is unavailable
[ https://issues.apache.org/jira/browse/KAFKA-12480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854611#comment-17854611 ] Ivan Yurchenko commented on KAFKA-12480: With [KIP-899|https://cwiki.apache.org/confluence/display/KAFKA/KIP-899:+Allow+producer+and+consumer+clients+to+rebootstrap] accepted and [KAFKA-8206|https://issues.apache.org/jira/browse/KAFKA-8206] implemented, please consider resolving this jira. > Reuse bootstrap servers in clients when last alive broker in cluster metadata > is unavailable > > > Key: KAFKA-12480 > URL: https://issues.apache.org/jira/browse/KAFKA-12480 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Ron Dagostino >Priority: Major > > https://issues.apache.org/jira/browse/KAFKA-12455 documented how a Java > client can temporarily lose connectivity to a 2-broker cluster that is > undergoing a roll because the client will repeatedly retry connecting to the > last alive broker that it knows about in the cluster metadata even when that > broker is unavailable. The client could potentially fallback to its > bootstrap brokers in this case and reconnect to the cluster quicker. > For example, assume a 2-broker cluster has broker IDs 1 and 2 and both appear > in the bootstrap servers for a consumer. Assume broker 1 rolls such that the > Java consumer receives a new METADATA response and only knows about broker 2 > being alive, and then broker 2 rolls before the consumer gets a new METADATA > response indicating that broker 1 is also alive. At this point the Java > consumer will keep retrying broker 2, and it will not reconnect to the > cluster unless/until broker 2 becomes available -- or the client itself is > restarted so it can use its bootstrap servers again. Another possibility is > to fallback to the full bootstrap servers list when the last alive broker > becomes unavailable. > I believe librdkafka-based client may perform this fallback, though I am not > certain. We should consider it for Java clients. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13467) Clients never refresh cached bootstrap IPs
[ https://issues.apache.org/jira/browse/KAFKA-13467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854612#comment-17854612 ] Ivan Yurchenko commented on KAFKA-13467: With [KIP-899|https://cwiki.apache.org/confluence/display/KAFKA/KIP-899:+Allow+producer+and+consumer+clients+to+rebootstrap] accepted and [KAFKA-8206|https://issues.apache.org/jira/browse/KAFKA-8206] implemented, please consider resolving this jira. > Clients never refresh cached bootstrap IPs > -- > > Key: KAFKA-13467 > URL: https://issues.apache.org/jira/browse/KAFKA-13467 > Project: Kafka > Issue Type: Improvement > Components: clients, network >Reporter: Matthias J. Sax >Priority: Minor > > Follow up ticket to https://issues.apache.org/jira/browse/KAFKA-13405. > For certain broker rolling upgrade scenarios, it would be beneficial to > expired cached bootstrap server IP addresses and re-resolve those IPs to > allow clients to re-connect to the cluster without the need to restart the > client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16781) Expose advertised.listeners in controller node
[ https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854610#comment-17854610 ] TengYao Chi commented on KAFKA-16781: - Hi [~tinaselenge] feel free to take this if you wish :) > Expose advertised.listeners in controller node > -- > > Key: KAFKA-16781 > URL: https://issues.apache.org/jira/browse/KAFKA-16781 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: TengYao Chi >Priority: Major > Labels: need-kip, newbie, newbie++ > > After > [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration], > we allow clients to talk to the KRaft controller node directly. But unlike > broker node, we don't allow users to config advertised.listeners for clients > to connect to. Without this config, the client cannot connect to the > controller node if the controller is sitting behind NAT network while the > client is in the external network. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14507) Add ConsumerGroupPrepareAssignment API
[ https://issues.apache.org/jira/browse/KAFKA-14507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dujian0068 reassigned KAFKA-14507: -- Assignee: dujian0068 > Add ConsumerGroupPrepareAssignment API > -- > > Key: KAFKA-14507 > URL: https://issues.apache.org/jira/browse/KAFKA-14507 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: dujian0068 >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14705) Remove deprecated options and redirections
[ https://issues.apache.org/jira/browse/KAFKA-14705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dujian0068 reassigned KAFKA-14705: -- Assignee: dujian0068 > Remove deprecated options and redirections > -- > > Key: KAFKA-14705 > URL: https://issues.apache.org/jira/browse/KAFKA-14705 > Project: Kafka > Issue Type: Sub-task >Reporter: Federico Valeri >Assignee: dujian0068 >Priority: Major > Fix For: 4.0.0 > > > We can use this task to track tools cleanup for the next major release > (4.0.0). > 1. Redirections to be removed: > - core/src/main/scala/kafka/tools/JmxTool > - core/src/main/scala/kafka/tools/ClusterTool > - core/src/main/scala/kafka/tools/StateChangeLogMerger > - core/src/main/scala/kafka/tools/EndToEndLatency > - core/src/main/scala/kafka/admin/FeatureCommand > - core/src/main/scala/kafka/tools/StreamsResetter > 2. Deprecated tools to be removed: > - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger > 3. TopicFilter, PartitionFilter and TopicPartitionFilter in "server-common" > should be moved to "tools" once we get rid of MirrorMaker1 dependency. > 4. We should also get rid of many deprecated options across all tools, > including not migrated tools. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16941) Flaky test - testDynamicBrokerConfigUpdateUsingKraft [1] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – kafka.admin.ConfigCommandIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-16941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854604#comment-17854604 ] Lin Siyuan commented on KAFKA-16941: I'm a newbie and I will do my best to find the problem, it can take a long time, if others can find the problem quickly, you are welcome to join in.🙂 > Flaky test - testDynamicBrokerConfigUpdateUsingKraft [1] Type=Raft-Combined, > MetadataVersion=4.0-IV0,Security=PLAINTEXT – > kafka.admin.ConfigCommandIntegrationTest > -- > > Key: KAFKA-16941 > URL: https://issues.apache.org/jira/browse/KAFKA-16941 > Project: Kafka > Issue Type: Test >Reporter: Igor Soarez >Priority: Minor > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16077/4/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 5000. > [listener.name.internal.ssl.keystore.location] are not updated ==> expected: > but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367) > at > kafka.admin.ConfigCommandIntegrationTest.verifyConfigDefaultValue(ConfigCommandIntegrationTest.java:519) > at > kafka.admin.ConfigCommandIntegrationTest.deleteAndVerifyConfig(ConfigCommandIntegrationTest.java:514) > at > kafka.admin.ConfigCommandIntegrationTest.testDynamicBrokerConfigUpdateUsingKraft(ConfigCommandIntegrationTest.java:237) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]
frankvicky commented on PR #16319: URL: https://github.com/apache/kafka/pull/16319#issuecomment-2164259901 Hi @zhaochun-ma, @showuon I have fix the bug that regex can't correctly parse that connection string starting with `SASL_PLAINTEXT`. PTAL 😄 Special thanks to @zhaochun-ma finding this issue. 🙏🏼 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port [kafka]
frankvicky opened a new pull request, #16319: URL: https://github.com/apache/kafka/pull/16319 In previous PR(#16048), I mistakenly excluded the underscore (_) from the set of valid characters for the protocol, resulting in the inability to correctly parse the connection string for SASL_PLAINTEXT. This bug fix addresses the issue and includes corresponding tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16913) Support external schemas in JSONConverter
[ https://issues.apache.org/jira/browse/KAFKA-16913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854601#comment-17854601 ] Ganesh Sadanala commented on KAFKA-16913: - is there any documentation to be updated with this change? or is that another task? > Support external schemas in JSONConverter > - > > Key: KAFKA-16913 > URL: https://issues.apache.org/jira/browse/KAFKA-16913 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Priyanka K U >Assignee: Ganesh Sadanala >Priority: Minor > Time Spent: 6h > Remaining Estimate: 0h > > KIP-1054: Support external schemas in JSONConverter : > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16913) Support external schemas in JSONConverter
[ https://issues.apache.org/jira/browse/KAFKA-16913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854600#comment-17854600 ] Ganesh Sadanala commented on KAFKA-16913: - PR is ready to review > Support external schemas in JSONConverter > - > > Key: KAFKA-16913 > URL: https://issues.apache.org/jira/browse/KAFKA-16913 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Priyanka K U >Assignee: Ganesh Sadanala >Priority: Minor > Time Spent: 6h > Remaining Estimate: 0h > > KIP-1054: Support external schemas in JSONConverter : > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
kirktrue commented on PR #16310: URL: https://github.com/apache/kafka/pull/16310#issuecomment-2164238647 @AndrewJSchofield @cadonna @lianetm @philipnee—please review this PR. It's an alternative take on #16241 that seems simpler and not fraught with peril. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
kirktrue commented on code in PR #16310: URL: https://github.com/apache/kafka/pull/16310#discussion_r1637405808 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1983,6 +2009,10 @@ SubscriptionState subscriptions() { return subscriptions; } +FetchCommittedOffsetsEvent pendingOffsetFetch() { Review Comment: That made sense, so I changed it. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to LegacyTaskAssignor [kafka]
apourchet opened a new pull request, #16318: URL: https://github.com/apache/kafka/pull/16318 Since the new public API for TaskAssignor shared a name, this rename will prevent users from confusing the internal definition with the public one. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10787: Apply spotless to `raft` module [kafka]
gongxuanzhang commented on PR #16278: URL: https://github.com/apache/kafka/pull/16278#issuecomment-2164215470 > @gongxuanzhang you need to fix it again :) Ok,I'll fix it periodically before merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16689: Move LogValidatorTest to storage module [kafka]
chia7712 commented on code in PR #16167: URL: https://github.com/apache/kafka/pull/16167#discussion_r1637373367 ## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ## @@ -0,0 +1,2100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.nio.ByteBuffer; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import kafka.server.BrokerTopicStats; +import kafka.server.RequestLocal; +import kafka.log.UnifiedLog; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import com.yammer.metrics.core.MetricName; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { +Time time = Time.SYSTEM; Review Comment: please add `private final` modifiers ## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ## @@ -0,0 +1,2100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.IOException; +imp
Re: [PR] [MINOR]:Update visibility from public to protected and adjust the order in BuiltInPartitioner [kafka]
gongxuanzhang commented on code in PR #16277: URL: https://github.com/apache/kafka/pull/16277#discussion_r1637379530 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -1033,10 +1033,18 @@ public Deque getDeque(TopicPartition tp) { * Get the deque for the given topic-partition, creating it if necessary. */ private Deque getOrCreateDeque(TopicPartition tp) { -TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k -> new TopicInfo(logContext, k, batchSize)); +TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), +k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize))); return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new ArrayDeque<>()); } +/** + * Subclass can custom {@link BuiltInPartitioner} Review Comment: You are right,it seem to don’t need explaining. I removed docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
[ https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854595#comment-17854595 ] Luke Chen commented on KAFKA-16946: --- [~frankvicky] , this is a blocker for v3.8.0. Hope we can fix it soon. Thanks. > Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port > - > > Key: KAFKA-16946 > URL: https://issues.apache.org/jira/browse/KAFKA-16946 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Luke Chen >Assignee: TengYao Chi >Priority: Blocker > Fix For: 3.8.0 > > > In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but > it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
[ https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16946: -- Fix Version/s: 3.8.0 > Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port > - > > Key: KAFKA-16946 > URL: https://issues.apache.org/jira/browse/KAFKA-16946 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Luke Chen >Assignee: TengYao Chi >Priority: Blocker > Fix For: 3.8.0 > > > In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but > it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16939: Revisit ConfigCommandIntegrationTest [kafka]
m1a2st opened a new pull request, #16317: URL: https://github.com/apache/kafka/pull/16317 https://issues.apache.org/jira/browse/KAFKA-16939 some secnario is wrong, so fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16942 Use ConcurrentHashMap in RecordAccumulator#nodeStats [kafka]
gongxuanzhang commented on PR #16305: URL: https://github.com/apache/kafka/pull/16305#issuecomment-2164206281 I agree, with the development of Java, the performance of ConcurrentHashMap has indeed significantly improved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]
mjsax commented on code in PR #16316: URL: https://github.com/apache/kafka/pull/16316#discussion_r1637372406 ## docs/streams/upgrade-guide.html: ## @@ -303,6 +303,62 @@ Streams API adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload which allows to force an instance to leave the + group immediately. + + Note: due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DSL. The existing methods KStream.transform, + KStream.flatTransform, KStream.transformValues, and KStream.flatTransformValues + as well as all overloads of void KStream.process are deprecated in favor of the newly added methods + +KStreamKStream.process(ProcessorSupplier, ...) +KStream KStream.processValues(FixedKeyProcessorSupplier, ...) + + Both new methods have multiple overlaods and return a KStream instead of void as the + deprecated process() methods did. In addition, FixedKeyProcessor, FixedKeyRecord, + FixedKeyProcessorContext, and ContextualFixedKeyProcessor are introduced to guard against + disallowed key modification inside processValues(). Furthermore ProcessingContext is + added for a better interface hierarchy. + + + + Emitting a windowed aggregation result only after a window is closed is currently supported via the + suppress() operator. However, suppress() uses an in-memory implementation and does not + support RocksDB. To close this gap, + https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced";>KIP-825 + introduces "emit strategies" which are built into the aggregation operator directly to use the already existing + RocksDB store. TimeWindowedKStream.emitStrategy(EmitStrategy) and + SessionWindowedKStream.emitStrategy(EmitStrategy) allow to pick between "emit on window update" (default) + and "emit on window close" strategies. Additionally, a few new emit metrics are added, as well as a necessary + new method SessionStore.findSessions(long, long). + + + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832";>KIP-834 allows to pause + and resume a Kafka Streams instance. Pausing implies that processing input records and executing punctuations will + be skipped; Kafka Streams will continue to poll to maintain its group membership and may commit offsets. + In addition to the new method KafkaStreams.pause() and KafkaStreams.resume(), it is also + supported to check if an instance is paused via KafkaStreams.isPaused() method. + + + + To improve monitoring of Kafka Streams applications, https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093";>KIP-846 + adds four new metrics bytes-consumed-total, records-consumed-total, + bytes-produced-total, and records-produced-total within a new topic level scope. Review Comment: These metrics are already added to the `ops.html` KS metrics section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: update Kafka Streams docs with 3.3 KIP information [kafka]
mjsax opened a new pull request, #16316: URL: https://github.com/apache/kafka/pull/16316 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]
chia7712 commented on PR #16231: URL: https://github.com/apache/kafka/pull/16231#issuecomment-2164203749 @brandboat could you please remove `Microbenchmarks` from suppression file? https://github.com/apache/kafka/blob/6d1f8f8727e8f9c7071b83b7dd9feb840d8d661f/checkstyle/suppressions.xml#L61 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
philipnee commented on code in PR #16310: URL: https://github.com/apache/kafka/pull/16310#discussion_r1637370335 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1983,6 +2009,10 @@ SubscriptionState subscriptions() { return subscriptions; } +FetchCommittedOffsetsEvent pendingOffsetFetch() { Review Comment: would `boolean hasPendingOffsetFetch()` be sufficient for the test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16938) non-dynamic props gets corrupted due to circular reference between DynamicBrokerConfig and DynamicConfig
[ https://issues.apache.org/jira/browse/KAFKA-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16938. Resolution: Fixed > non-dynamic props gets corrupted due to circular reference between > DynamicBrokerConfig and DynamicConfig > > > Key: KAFKA-16938 > URL: https://issues.apache.org/jira/browse/KAFKA-16938 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Blocker > Fix For: 3.9.0 > > > DynamicBrokerConfig has circular reference with DynamicConfig. The following > initialization order will cause incorrect non-dynamic props [0] > 1. DynamicConfig is initializing -> brokerConfigs is created [1] > 2. DynamicConfig is initializing -> call > DynamicBrokerConfig.addDynamicConfigs(brokerConfigs) [2] > 3. DynamicBrokerConfig is initializing -> nonDynamicProps: Set[String] = > KafkaConfig.configNames.toSet – DynamicConfig.Broker.names.asScala [3] > 4. DynamicConfig.Broker.names reference `brokerConfigs`, and `brokerConfigs` > does not have all dynamic props (step2), so nonDynamicProps get created with > incorrect contents. > We should break the circular by addressing following tasks: > 1. move `DynamicBrokerConfig.addDynamicConfigs` to `DynamicConfig.Broker` > 2. move `DynamicBrokerConfig#nonDynamicProps` to `DynamicConfig.Broker` > {code:scala} > object DynamicConfig { > object Broker { > private val brokerConfigs = { > val configs = QuotaConfigs.brokerQuotaConfigs() > KafkaConfig.configKeys.filter { > case (configName, _) => AllDynamicConfigs.contains(configName) > }.foreach { > case (_, config) => > configs.define(config.name, config.`type`, config.defaultValue, > config.validator, > config.importance, config.documentation, config.group, > config.orderInGroup, config.width, > config.displayName, config.dependents, config.recommender) > } > configs > } > val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- > brokerConfigs.names.asScala > {code} > [0] > [https://github.com/apache/kafka/blob/638844f833b165d6f9ca52c173858d26b7254fac/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L120] > [1] > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicConfig.scala#L35] > [2] > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicConfig.scala#L36] > [3] > [https://github.com/apache/kafka/blob/638844f833b165d6f9ca52c173858d26b7254fac/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L120] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16944) Range assignor doesn't co-partition with stickiness
[ https://issues.apache.org/jira/browse/KAFKA-16944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854593#comment-17854593 ] dujian0068 commented on KAFKA-16944: Hello, does this problem need to be fixed quickly? If not, can it be assigned to me? It is not easy for me to find a problem, and it may take me some time to deal with it > Range assignor doesn't co-partition with stickiness > --- > > Key: KAFKA-16944 > URL: https://issues.apache.org/jira/browse/KAFKA-16944 > Project: Kafka > Issue Type: Sub-task >Reporter: Ritika Reddy >Assignee: Ritika Reddy >Priority: Major > > When stickiness is considered during range assignments, it is possible that > in certain cases where co-partitioning is guaranteed we fail. > An example would be: > Consider two topics T1, T2 with 3 partitions each and three members A, B, C. > Let's say the existing assignment (for whatever reason) is: > {quote}A -> T1P0 || B -> T1P1, T2P0, T2P1, T2P2 || C -> T1P2 > {quote} > Now we trigger a rebalance with the following subscriptions where all members > are subscribed to both topics everything else is the same > {quote}A -> T1, T2 || B -> T1, T2 || C -> T1, T2 > {quote} > Since all the topics have an equal number of partitions and all the members > are subscribed to the same set of topics we would expect co-partitioning > right so would we want the final assignment returned to be > {quote}A -> T1P0, T2P0 || B -> T1P1, T2P1 || C -> T1P2, T2P2 > {quote} > SO currently the client side assignor returns the following but it's because > they don't assign sticky partitions > {{{}C=[topic1-2, topic2-2], B=[topic1-1, topic2-1], A=[topic1-0, > topic2-0]{}}}Our > > Server side assignor returns: > (The partitions in bold are the sticky partitions) > {{{}A=MemberAssignment(targetPartitions={topic2=[1], > }}\{{{}{*}topic1=[0]{*}{}}}{{{}}), > B=MemberAssignment(targetPartitions={{}}}{{{}*topic2=[0]*{}}}{{{}, > {{{{{}*topic1=[1]*{}}}{{{}}), > C=MemberAssignment(targetPartitions={topic2=[2], {{{{{}*topic1=[2]*{}}} > *As seen above co-partitioning is expected but not returned.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16938: non-dynamic props gets corrupted due to circular reference between DynamicBrokerConfig and DynamicConfig. [kafka]
chia7712 merged PR #16302: URL: https://github.com/apache/kafka/pull/16302 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
[ https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854591#comment-17854591 ] Luke Chen commented on KAFKA-16946: --- Go ahead to fix it, [~frankvicky] ! > Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port > - > > Key: KAFKA-16946 > URL: https://issues.apache.org/jira/browse/KAFKA-16946 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Luke Chen >Priority: Blocker > > In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but > it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
[ https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-16946: - Assignee: TengYao Chi > Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port > - > > Key: KAFKA-16946 > URL: https://issues.apache.org/jira/browse/KAFKA-16946 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Luke Chen >Assignee: TengYao Chi >Priority: Blocker > > In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but > it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Clean up for KafkaAdminClientTest [kafka]
chia7712 merged PR #16285: URL: https://github.com/apache/kafka/pull/16285 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]
cmccabe commented on PR #16048: URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164187718 > @frankvicky looks like with this change, SASL_PLAINTEXT://localhost:50132 becomes invalid url because of _ in the protocol, and it's a breaking change, is this expected? definitely a bug. we should fix or revert -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16689: Move LogValidatorTest to storage module [kafka]
TaiJuWu commented on PR #16167: URL: https://github.com/apache/kafka/pull/16167#issuecomment-2164170657 @chia7712 Please take look! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update 3.8 documentation for Kafka Streams [kafka]
ableegoldman commented on code in PR #16265: URL: https://github.com/apache/kafka/pull/16265#discussion_r1637334663 ## docs/streams/developer-guide/config-streams.html: ## @@ -441,8 +441,8 @@ num.standby.replicastask.assignor.class Medium -A task assignor class or class name implementing the org.apache.kafka.streams.processor.assignment.TaskAssignor interface. -The default high-availability task assignor. +A task assignor class or class name implementing the TaskAssignor interface. Review Comment: alternatively we might just want to rename the old interface to `LegacyTaskAssignor` or something like that. See my message on CC slack cc @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]
kirktrue commented on PR #16241: URL: https://github.com/apache/kafka/pull/16241#issuecomment-2164161606 @lianetm @cadonna @AndrewJSchofield—PTAL at an alternate implementation of this Jira: #16310. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13560) Load indexes and data in async manner in the critical path of replica fetcher threads.
[ https://issues.apache.org/jira/browse/KAFKA-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-13560: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Load indexes and data in async manner in the critical path of replica fetcher > threads. > --- > > Key: KAFKA-13560 > URL: https://issues.apache.org/jira/browse/KAFKA-13560 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Satish Duggana >Priority: Major > Fix For: 3.9.0 > > > https://github.com/apache/kafka/pull/11390#discussion_r762366976 > https://github.com/apache/kafka/pull/11390#discussion_r1033141283 > https://github.com/apache/kafka/pull/15690 removed the below method from in > `TierStateMachine` interface. This can be added back when we implement the > functionality required to address this issue. > {code:java} > public Optional maybeAdvanceState(TopicPartition > topicPartition, PartitionFetchState currentFetchState) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15300) Include remotelog size in complete log size and also add local log size and remote log size separately in kafka-log-dirs tool.
[ https://issues.apache.org/jira/browse/KAFKA-15300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15300: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Include remotelog size in complete log size and also add local log size and > remote log size separately in kafka-log-dirs tool. > --- > > Key: KAFKA-15300 > URL: https://issues.apache.org/jira/browse/KAFKA-15300 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Satish Duggana >Priority: Major > Fix For: 3.9.0 > > > Include remotelog size in complete log size and also add local log size and > remote log size separately in kafka-log-dirs tool. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15480) Add RemoteStorageInterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-15480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15480: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Add RemoteStorageInterruptedException > - > > Key: KAFKA-15480 > URL: https://issues.apache.org/jira/browse/KAFKA-15480 > Project: Kafka > Issue Type: Task > Components: core >Affects Versions: 3.6.0 >Reporter: Mital Awachat >Priority: Major > Labels: kip > Fix For: 3.9.0 > > > Introduce `RemoteStorageInterruptedException` to propagate interruptions from > the plugin to Kafka without generated (false) errors. > It allows the plugin to notify Kafka an API operation in progress was > interrupted as a result of task cancellation, which can happen under changes > such as leadership migration or topic deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15214: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Add metrics for OffsetOutOfRangeException when tiered storage is enabled > > > Key: KAFKA-15214 > URL: https://issues.apache.org/jira/browse/KAFKA-15214 > Project: Kafka > Issue Type: Task > Components: metrics >Affects Versions: 3.6.0 >Reporter: Lixin Yao >Priority: Minor > Labels: KIP-405 > Fix For: 3.9.0 > > > In the current metrics RemoteReadErrorsPerSec, the exception type > OffsetOutOfRangeException is not included. > In our testing with tiered storage feature (at Apple), we noticed several > cases where remote download is affected and stuck due to repeatedly > OffsetOutOfRangeException in some particular broker or topic partitions. The > root cause could be various but currently without a metrics it's very hard to > catch this issue and debug in a timely fashion. It's understandable that the > exception itself could not be the root cause but this exception metric could > be a good metrics for us to alert and investigate. > Related discussion > [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] > I am happy to contribute to this if the request is agreed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15529) Flaky test ReassignReplicaShrinkTest.executeTieredStorageTest
[ https://issues.apache.org/jira/browse/KAFKA-15529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15529: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Flaky test ReassignReplicaShrinkTest.executeTieredStorageTest > - > > Key: KAFKA-15529 > URL: https://issues.apache.org/jira/browse/KAFKA-15529 > Project: Kafka > Issue Type: Test > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Priority: Blocker > Labels: flaky-test > Fix For: 3.9.0 > > > Example of failed CI build - > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14449/3/testReport/junit/org.apache.kafka.tiered.storage.integration/ReassignReplicaShrinkTest/Build___JDK_21_and_Scala_2_13___executeTieredStorageTest_String__quorum_kraft_2/] > > {noformat} > org.opentest4j.AssertionFailedError: Number of fetch requests from broker 0 > to the tier storage does not match the expected value for topic-partition > topicA-1 ==> expected: <3> but was: <4> > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) > at > app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:559) > at > app//org.apache.kafka.tiered.storage.actions.ConsumeAction.doExecute(ConsumeAction.java:128) > at > app//org.apache.kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestAction.java:25) > at > app//org.apache.kafka.tiered.storage.TieredStorageTestHarness.executeTieredStorageTest(TieredStorageTestHarness.java:112){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager
[ https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15038: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Use topic id/name mapping from the Metadata cache in the RemoteLogManager > - > > Key: KAFKA-15038 > URL: https://issues.apache.org/jira/browse/KAFKA-15038 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Alexandre Dupriez >Assignee: Owen C.H. Leung >Priority: Minor > Fix For: 3.9.0 > > > Currently, the {{RemoteLogManager}} maintains its own cache of topic name to > topic id > [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138] > using the information provided during leadership changes, and removing the > mapping upon receiving the notification of partition stopped. > It should be possible to re-use the mapping in a broker's metadata cache, > removing the need for the RLM to build and update a local cache thereby > duplicating the information in the metadata cache. It also allows to preserve > a single source of authority regarding the association between topic names > and ids. > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16946) Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port
[ https://issues.apache.org/jira/browse/KAFKA-16946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854582#comment-17854582 ] TengYao Chi commented on KAFKA-16946: - Hi [~showuon] , if the issue finder is not willing to do, I would like to fix this > Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host:port > - > > Key: KAFKA-16946 > URL: https://issues.apache.org/jira/browse/KAFKA-16946 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Luke Chen >Priority: Blocker > > In KAFKA-16824, we tried to improve the regex for Utils.getHost/getPort, but > it failed to parse SASL_PLAINTEXT://host:port now. Need to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-9578) Kafka Tiered Storage - System Tests
[ https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9578: -- Fix Version/s: 3.9.0 (was: 3.8.0) > Kafka Tiered Storage - System Tests > > > Key: KAFKA-9578 > URL: https://issues.apache.org/jira/browse/KAFKA-9578 > Project: Kafka > Issue Type: Test >Reporter: Harsha >Priority: Major > Fix For: 3.9.0 > > > Initial test cases set up by [~Ying Zheng] > > [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15132: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Implement disable & re-enablement for Tiered Storage > > > Key: KAFKA-15132 > URL: https://issues.apache.org/jira/browse/KAFKA-15132 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Labels: kip > Fix For: 3.9.0 > > > KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the > limitations mentioned in the KIP is inability to re-enable TS on a topic > after it has been disabled. > {quote}Once tier storage is enabled for a topic, it can not be disabled. We > will add this feature in future versions. One possible workaround is to > create a new topic and copy the data from the desired offset and delete the > old topic. > {quote} > This task will propose a new KIP which extends on KIP-405 to describe the > behaviour on on disablement and re-enablement of tiering storage for a topic. > The solution will apply for both Zk and KRaft variants. > [1] KIP-405 - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15376: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Explore options of removing data earlier to the current leader's leader epoch > lineage for topics enabled with tiered storage. > - > > Key: KAFKA-15376 > URL: https://issues.apache.org/jira/browse/KAFKA-15376 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Satish Duggana >Priority: Major > Fix For: 3.9.0 > > > Followup on the discussion thread: > [https://github.com/apache/kafka/pull/13561#discussion_r1288778006] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15864) Add more tests asserting the log-start-offset, local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15864: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Add more tests asserting the log-start-offset, local-log-start-offset, and > HW/LSO/LEO in rolling over segments with tiered storage. > --- > > Key: KAFKA-15864 > URL: https://issues.apache.org/jira/browse/KAFKA-15864 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Priority: Major > Labels: tiered-storage > Fix For: 3.9.0 > > > Followup on the > [comment|https://github.com/apache/kafka/pull/14766/files#r1395389551] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15094) Add RemoteIndexCache metrics like misses/evictions/load-failures.
[ https://issues.apache.org/jira/browse/KAFKA-15094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15094: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Add RemoteIndexCache metrics like misses/evictions/load-failures. > - > > Key: KAFKA-15094 > URL: https://issues.apache.org/jira/browse/KAFKA-15094 > Project: Kafka > Issue Type: Improvement >Reporter: Satish Duggana >Assignee: Abhijeet Kumar >Priority: Major > Fix For: 3.9.0 > > > Add metrics like hits/misses/evictions/load-failures for RemoteIndexCache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.
[ https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14915: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Option to consume multiple partitions that have their data in remote storage > for the target offsets. > > > Key: KAFKA-14915 > URL: https://issues.apache.org/jira/browse/KAFKA-14915 > Project: Kafka > Issue Type: Improvement >Reporter: Satish Duggana >Priority: Major > Labels: tiered-storage > Fix For: 3.9.0 > > > Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15195) Regenerate segment-aligned producer snapshots when upgrading to a Kafka version supporting Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15195: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Regenerate segment-aligned producer snapshots when upgrading to a Kafka > version supporting Tiered Storage > - > > Key: KAFKA-15195 > URL: https://issues.apache.org/jira/browse/KAFKA-15195 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.6.0 >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Fix For: 3.9.0 > > > As mentioned in KIP-405: Kafka Tiered Storage#Upgrade a customer wishing to > upgrade from a Kafka version < 2.8.0 to 3.6 and turn Tiered Storage on will > have to wait for retention to clean up segments without an associated > producer snapshot. > However, in our experience, customers of Kafka expect to be able to > immediately enable tiering on a topic once their cluster upgrade is complete. > Once they do this, however, they start seeing NPEs and no data is uploaded to > Tiered Storage > (https://github.com/apache/kafka/blob/9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61). > To achieve this, we propose changing Kafka to retroactively create producer > snapshot files on upload whenever a segment is due to be archived and lacks > one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15301) [Tiered Storage] Historically compacted topics send request to remote for active segment during consume
[ https://issues.apache.org/jira/browse/KAFKA-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15301: --- Fix Version/s: 3.9.0 (was: 3.8.0) > [Tiered Storage] Historically compacted topics send request to remote for > active segment during consume > --- > > Key: KAFKA-15301 > URL: https://issues.apache.org/jira/browse/KAFKA-15301 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.0 >Reporter: Mital Awachat >Assignee: Jimmy Wang >Priority: Major > Fix For: 3.9.0 > > > I have a use case where tiered storage plugin received requests for active > segments. The topics for which it happened were historically compacted topics > for which compaction was disabled and tiering was enabled. > Create topic with compact cleanup policy -> Produce data with few repeat keys > and create multiple segments -> let compaction happen -> change cleanup > policy to delete -> produce some more data for segment rollover -> enable > tiering on topic -> wait for segments to be uploaded to remote storage and > cleanup from local (active segment would remain), consume from beginning -> > Observe logs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15341: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Priority: Major > Labels: KIP-405 > Fix For: 3.9.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13355) Shutdown broker eventually when unrecoverable exceptions like IOException encountered in RLMM.
[ https://issues.apache.org/jira/browse/KAFKA-13355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-13355: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Shutdown broker eventually when unrecoverable exceptions like IOException > encountered in RLMM. > --- > > Key: KAFKA-13355 > URL: https://issues.apache.org/jira/browse/KAFKA-13355 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Abhijeet Kumar >Priority: Major > Labels: tiered-storage > Fix For: 3.9.0 > > > Have mechanism to catch unrecoverable exceptions like IOException from RLMM > and shutdown the broker like it is done in log layer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15969) Align RemoteStorageThreadPool metrics name with KIP-405
[ https://issues.apache.org/jira/browse/KAFKA-15969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15969: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Align RemoteStorageThreadPool metrics name with KIP-405 > --- > > Key: KAFKA-15969 > URL: https://issues.apache.org/jira/browse/KAFKA-15969 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 3.6.0 >Reporter: Lixin Yao >Priority: Minor > Fix For: 3.9.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > In KIP-405, there are 2 metrics defined below: > ^kafka.log.remote:type=RemoteStorageThreadPool, > name=RemoteLogReaderTaskQueueSize^ > and > ^kafka.log.remote:type=RemoteStorageThreadPool, > name=RemoteLogReaderAvgIdlePercent^ > However, in Kafka 3.6 release, the actual metrics exposes are: > ^org.apache.kafka.storage.internals.log:name=RemoteLogReaderAvgIdlePercent,type=RemoteStorageThreadPool^ > ^org.apache.kafka.storage.internals.log:name=RemoteLogReaderTaskQueueSize,type=RemoteStorageThreadPool^ > The problem is the bean domain name is changed from ^{{kafka.log.remote}}^ to > {{{}^org.apache.kafka.storage.internals.log^{}}}. And the type name is also > changed. > We should either update the metrics path in KIP, or fix the path in the code. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15331) Handle remote log enabled topic deletion when leader is not available
[ https://issues.apache.org/jira/browse/KAFKA-15331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15331: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Handle remote log enabled topic deletion when leader is not available > - > > Key: KAFKA-15331 > URL: https://issues.apache.org/jira/browse/KAFKA-15331 > Project: Kafka > Issue Type: Bug >Reporter: Kamal Chandraprakash >Assignee: hudeqi >Priority: Major > Fix For: 3.9.0 > > > When a topic gets deleted, then there can be a case where all the replicas > can be out of ISR. This case is not handled, See: > [https://github.com/apache/kafka/pull/13947#discussion_r1289331347] for more > details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15388: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Fix For: 3.9.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16917: Align the returned Map type of KafkaAdminClient [kafka]
frankvicky commented on code in PR #16250: URL: https://github.com/apache/kafka/pull/16250#discussion_r1637320049 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -1988,6 +1989,15 @@ public void testDescribeClientMetricsConfigs() throws Exception { } } +@Test +public void testCreateDescribeConfigsByDuplicateResources() { +ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, "1"); +ConfigResource duplicateResource = new ConfigResource(ConfigResource.Type.BROKER, "1"); +try (AdminClientUnitTestEnv env = mockClientEnv()) { +assertDoesNotThrow(() -> env.adminClient().describeConfigs(asList(resource, duplicateResource))); Review Comment: Cool, I will do that 👌🏾 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]
frankvicky commented on PR #16048: URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164151022 Hi @zhaochun-ma Thanks for your remind 😄 If you have free cycle, it would be nice you could have a PR to fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]
zhaochun-ma commented on PR #16048: URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164138119 > @zhaochun-ma , nice find! I've created [KAFKA-16946](https://issues.apache.org/jira/browse/KAFKA-16946) for this issue. Are you interested in opening a PR to fix it? i haven't worked on kafka before, might be better for someone else to pick it up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16862: Refactor ConsumerTaskTest to be deterministic and avoid tight loops [kafka]
xiaoqingwanga commented on PR #16303: URL: https://github.com/apache/kafka/pull/16303#issuecomment-2164138080 @gharris1727 Thanks for the tip! I'll look into it and tweak the code a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint
[ https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854579#comment-17854579 ] Satish Duggana commented on KAFKA-14877: InMemoryLeaderEpochCheckpoint seems to be deleted with other refactoring. > refactor InMemoryLeaderEpochCheckpoint > -- > > Key: KAFKA-14877 > URL: https://issues.apache.org/jira/browse/KAFKA-14877 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Minor > Fix For: 3.8.0 > > > follow up with this comment: > https://github.com/apache/kafka/pull/13456#discussion_r1154306477 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint
[ https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved KAFKA-14877. Resolution: Invalid > refactor InMemoryLeaderEpochCheckpoint > -- > > Key: KAFKA-14877 > URL: https://issues.apache.org/jira/browse/KAFKA-14877 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Minor > Fix For: 3.8.0 > > > follow up with this comment: > https://github.com/apache/kafka/pull/13456#discussion_r1154306477 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests
[ https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-16935. --- Resolution: Fixed > Automatically wait for cluster startup in embedded Connect integration tests > > > Key: KAFKA-16935 > URL: https://issues.apache.org/jira/browse/KAFKA-16935 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.8.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0 > > > It's a common idiom in our integration tests to [start an embedded Kafka and > Connect > cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135] > and then immediately afterwards [wait for each worker in the Connect cluster > to complete > startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92]. > Separating these two actions into separate steps makes our tests lengthier > and can even lead to bugs and flakiness if the second step is accidentally > omitted (see [https://github.com/apache/kafka/pull/16286] for one example). > Instead, we should default to automatically awaiting the complete startup of > every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} > is invoked, and require callers to opt out if they do not want to > automatically wait for startup to complete when invoking that method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests
[ https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reopened KAFKA-16935: --- > Automatically wait for cluster startup in embedded Connect integration tests > > > Key: KAFKA-16935 > URL: https://issues.apache.org/jira/browse/KAFKA-16935 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.8.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0 > > > It's a common idiom in our integration tests to [start an embedded Kafka and > Connect > cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135] > and then immediately afterwards [wait for each worker in the Connect cluster > to complete > startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92]. > Separating these two actions into separate steps makes our tests lengthier > and can even lead to bugs and flakiness if the second step is accidentally > omitted (see [https://github.com/apache/kafka/pull/16286] for one example). > Instead, we should default to automatically awaiting the complete startup of > every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} > is invoked, and require callers to opt out if they do not want to > automatically wait for startup to complete when invoking that method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests
[ https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-16935: -- Fix Version/s: 3.8.0 > Automatically wait for cluster startup in embedded Connect integration tests > > > Key: KAFKA-16935 > URL: https://issues.apache.org/jira/browse/KAFKA-16935 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.8.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0 > > > It's a common idiom in our integration tests to [start an embedded Kafka and > Connect > cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135] > and then immediately afterwards [wait for each worker in the Connect cluster > to complete > startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92]. > Separating these two actions into separate steps makes our tests lengthier > and can even lead to bugs and flakiness if the second step is accidentally > omitted (see [https://github.com/apache/kafka/pull/16286] for one example). > Instead, we should default to automatically awaiting the complete startup of > every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} > is invoked, and require callers to opt out if they do not want to > automatically wait for startup to complete when invoking that method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests [kafka]
C0urante merged PR #16288: URL: https://github.com/apache/kafka/pull/16288 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16947) Kafka Tiered Storage V2
[ https://issues.apache.org/jira/browse/KAFKA-16947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-16947: --- Fix Version/s: 3.9.0 (was: 3.8.0) > Kafka Tiered Storage V2 > --- > > Key: KAFKA-16947 > URL: https://issues.apache.org/jira/browse/KAFKA-16947 > Project: Kafka > Issue Type: Improvement >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Labels: KIP-405 > Fix For: 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13561) Consider deprecating `StreamsBuilder#build(props)` function
[ https://issues.apache.org/jira/browse/KAFKA-13561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13561. - Resolution: Duplicate > Consider deprecating `StreamsBuilder#build(props)` function > --- > > Key: KAFKA-13561 > URL: https://issues.apache.org/jira/browse/KAFKA-13561 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > With > https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store > being accepted that introduced the new `StreamsBuilder(TopologyConfig)` > constructor, we can consider deprecating the `StreamsBuilder#build(props)` > function now. There are still a few things we'd need to do: > 1. Copy the `StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG` to TopologyConfig. > 2. Make sure the overloaded `StreamsBuilder()` constructor takes in default > values of TopologyConfig. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16947) Kafka Tiered Storage V2
[ https://issues.apache.org/jira/browse/KAFKA-16947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-16947: --- Affects Version/s: (was: 3.6.0) > Kafka Tiered Storage V2 > --- > > Key: KAFKA-16947 > URL: https://issues.apache.org/jira/browse/KAFKA-16947 > Project: Kafka > Issue Type: Improvement >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Labels: KIP-405 > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16947) Kafka Tiered Storage V2
Satish Duggana created KAFKA-16947: -- Summary: Kafka Tiered Storage V2 Key: KAFKA-16947 URL: https://issues.apache.org/jira/browse/KAFKA-16947 Project: Kafka Issue Type: Improvement Affects Versions: 3.6.0 Reporter: Satish Duggana Assignee: Satish Duggana Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16945) Cleanup StreamsBuilder and TopologyConfig
[ https://issues.apache.org/jira/browse/KAFKA-16945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854578#comment-17854578 ] Matthias J. Sax commented on KAFKA-16945: - I did not look... :P Found https://issues.apache.org/jira/browse/KAFKA-13561 but it only mentions `builder(Properties)` but nothing about `TopologyConfig` so I'll go ahead an close as this ticket is more generic. > Cleanup StreamsBuilder and TopologyConfig > - > > Key: KAFKA-16945 > URL: https://issues.apache.org/jira/browse/KAFKA-16945 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: needs-kip > > Historically, Kafka Streams offers two ways to build a topology: either via > the PAPI by creating a `new Topology()` explicitly, or via the > `StreamsBuilder` which returns a topology via `build()` method. > We later added an overload `build(Properties)` to enable topology > optimizations for the DSL layer. > Furthermore, we also added `TopologyConfig` object, which can be passed into > `new Topology(TopologyConfig)` as well as `StreamsBuilder(TopologyConfig)`. > We should consider to unify the different approaches to simplify the rather > complex API we have right now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16890) Failing to build aux state on broker failover
[ https://issues.apache.org/jira/browse/KAFKA-16890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved KAFKA-16890. Resolution: Fixed > Failing to build aux state on broker failover > - > > Key: KAFKA-16890 > URL: https://issues.apache.org/jira/browse/KAFKA-16890 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0, 3.7.1 >Reporter: Francois Visconte >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.8.0 > > > We have clusters where we replace machines often falling into a state where > we keep having "Error building remote log auxiliary state for > loadtest_topic-22" and the partition being under-replicated until the leader > is manually restarted. > Looking into a specific case, here is what we observed in > __remote_log_metadata topic: > {code:java} > > partition: 29, offset: 183593, value: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=GZeRTXLMSNe2BQjRXkg6hQ}, startOffset=10823, endOffset=11536, > brokerId=10013, maxTimestampMs=1715774588597, eventTimestampMs=1715781657604, > segmentLeaderEpochs={125=10823, 126=10968, 128=11047, 130=11048, 131=11324, > 133=11442, 134=11443, 135=11445, 136=11521, 137=11533, 139=11535}, > segmentSizeInBytes=704895, customMetadata=Optional.empty, > state=COPY_SEGMENT_STARTED} > partition: 29, offset: 183594, value: > RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=GZeRTXLMSNe2BQjRXkg6hQ}, customMetadata=Optional.empty, > state=COPY_SEGMENT_FINISHED, eventTimestampMs=1715781658183, brokerId=10013} > partition: 29, offset: 183669, value: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=L1TYzx0lQkagRIF86Kp0QQ}, startOffset=10823, endOffset=11544, > brokerId=10008, maxTimestampMs=1715781445270, eventTimestampMs=1715782717593, > segmentLeaderEpochs={125=10823, 126=10968, 128=11047, 130=11048, 131=11324, > 133=11442, 134=11443, 135=11445, 136=11521, 137=11533, 139=11535, 140=11537, > 142=11543}, segmentSizeInBytes=713088, customMetadata=Optional.empty, > state=COPY_SEGMENT_STARTED} > partition: 29, offset: 183670, value: > RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=L1TYzx0lQkagRIF86Kp0QQ}, customMetadata=Optional.empty, > state=COPY_SEGMENT_FINISHED, eventTimestampMs=1715782718370, brokerId=10008} > partition: 29, offset: 186215, value: > RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=L1TYzx0lQkagRIF86Kp0QQ}, customMetadata=Optional.empty, > state=DELETE_SEGMENT_STARTED, eventTimestampMs=1715867874617, brokerId=10008} > partition: 29, offset: 186216, value: > RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=L1TYzx0lQkagRIF86Kp0QQ}, customMetadata=Optional.empty, > state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1715867874725, brokerId=10008} > partition: 29, offset: 186217, value: > RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=GZeRTXLMSNe2BQjRXkg6hQ}, customMetadata=Optional.empty, > state=DELETE_SEGMENT_STARTED, eventTimestampMs=1715867874729, brokerId=10008} > partition: 29, offset: 186218, value: > RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=ClnIeN0MQsi_d4FAOFKaDA:loadtest_topic-22, > id=GZeRTXLMSNe2BQjRXkg6hQ}, customMetadata=Optional.empty, > state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1715867874817, brokerId=10008} > {code} > > It seems that at the time the leader is restarted (10013), a second copy of > the same segment is tiered by the new leader (10008). Interestingly the > segment doesn't have the same end offset, which is concerning. > Then the follower sees the following error repeatedly until the leader is > restarted: > > {code:java} > [2024-05-17 20:46:42,133] DEBUG [ReplicaFetcher replicaId=10013, > leaderId=10008, fetcherId=0] Handling errors in processFetchRequest for > partitions HashSet(loadtest_topic-22) (kafka.server.ReplicaFetcherThread) > [2024-05-17 20:46:43,174] DEBUG [ReplicaFetcher replicaId=10013, > leaderId=10008, fetcherId=0] Received error OFFSET_MOVED_TO_TIERED_STORAGE, > at fetch offset: 11537, topic-partition: loadtest_topic-22 > (kafka.server.ReplicaFetcherThread) > [2024-05-17 20:46:43,175] ERROR [ReplicaFetcher replicaId=10013, > leaderId=10008, fetcherId=0] Error bu
[jira] [Commented] (KAFKA-16925) stream-table join does not immediately forward expired records on restart
[ https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854577#comment-17854577 ] Matthias J. Sax commented on KAFKA-16925: - {quote}My understanding is that the processor here wants to know the maximum observed stream time so far (including the current record), and context.currentStreamTimeMs() is set from the timestamp of input records. For me that's the information this processor is looking for ? {quote} Assume there is an upstream processor with a state store, and the upstream processor puts input record into the state store, but does yet forward them (could be more than one record). Thus, task stream-time would account for the records buffered in the state store. Assume that this advanced stream-time might close some downstream window in the join. Later, the upstream processor takes out the records from the store and finally forwards them into the join – the join might now incorrectly drop the record if the window got already closed (via a rebalance and re-init of stream-time would would jump forward now) if it would use the task tracked stream-time, even if there is no actual reason to drop the records. Thus, the task's tracked stream-time should not be used IMHO to init the operator's stream-time. Putting the information into the store is usually tricky. – We need to find a key for the stream time which is guaranteed to be unique and does not conflict with any other data key from the actually records we process... It would become a very operator (or store) specific solution, which seems to be not generic (especially for PAPI usage it would be nice to have something generic inside the KS runtime that is independent of the use processors and/or state stores). {quote}I am not talking about general case, where I agree that it is better to put the logic in the store itself or any other way that is reusable in processors. {quote} Not sure if we would need to put it into the store – IMHO, we could also track stream-time inside the KS runtime on a per-operator basis, but as it might result in some overhead, we might want to make it opt-in and disable by default. The tracked time is stores via commit offset metadata, snd this metadata size can become an issue if too large (thus we should try to keep it as small as possible). – What actually raises a more general question anyway: this metadata did grow over time, and we did consider to maybe even add an in-memory metadata store (backed by a topic) to get rid of the commit metadata overhead... I would really like to get to an more holistic solution (we did bolt on too many island solutions over the years, what it totally ok for some time, but I believe we are reaching a state where it's worth to build a generic good solution, and stop adding more island solutions...) > stream-table join does not immediately forward expired records on restart > - > > Key: KAFKA-16925 > URL: https://issues.apache.org/jira/browse/KAFKA-16925 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ayoub Omari >Assignee: Ayoub Omari >Priority: Major > > [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join] > introduced grace period for KStreamKTableJoin. This allows to join a stream > to a KTable backed by a Versioned state store. > Upon receiving a record, it is put in a buffer until grace period is elapsed. > When the grace period elapses, the record is joined with its most recent > match from the versioned state store. > +Late records+ are +not+ put in the buffer and are immediately joined. > > {code:java} > If the grace period is non zero, the record will enter a stream buffer and > will dequeue when the record timestamp is less than or equal to stream time > minus the grace period. Late records, out of the grace period, will be > executed right as they come in. (KIP-923){code} > > However, this is not the case today on rebalance or restart. The reason is > that observedStreamTime is taken from the underlying state store which looses > this information on rebalance/restart: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164] > > If the task restarts and receives an expired record, the buffer considers > that this record has the maximum stream time observed so far, and puts it in > the buffer instead of immediately joining it. > > {*}Example{*}: > * Grace period = 60s > * KTable contains (key, rightValue) > > +Normal scenario+ > {code:java} > streamInput1 (key, value1) <--- time = T : put in buffer > streamInput2 (key, value2) <--- time = T - 60s : imme
Re: [PR] KAFKA-16824: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports. [kafka]
showuon commented on PR #16048: URL: https://github.com/apache/kafka/pull/16048#issuecomment-2164106388 @zhaochun-ma , nice find! I've created [KAFKA-16946](https://issues.apache.org/jira/browse/KAFKA-16946) for this issue. Are you interested in opening a PR to fix it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org