Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
kamalcph commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1349878452 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -134,7 +134,7 @@ public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, .maximumSize(maxSize) // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or Review Comment: Can we update the comment to refer the `evictionListener` instead of `removeListener`? ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -169,7 +169,16 @@ public Cache internalCache() { public void remove(Uuid key) { lock.readLock().lock(); try { -internalCache.invalidate(key); +internalCache.asMap().computeIfPresent(key, (k, v) -> { +try { +v.markForCleanup(); +v.cleanup(); Review Comment: Also, we have to add that entry to the `expiredIndexes` so that cleanup thread will be invoked on the removed entry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on PR #14482: URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752368980 > @iit2009060 , there are compilation failure, could you fix it? https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14482/10/pipeline/ on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1349871792 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -524,23 +527,221 @@ class RemoteIndexCacheTest { } } - @Test - def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = { -// create Corrupt Offset Index File -createCorruptRemoteIndexCacheOffsetFile() + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { +// create Corrupted Index File in remote index cache +createCorruptedIndexFile(indexType, cache.cacheDir()) val entry = cache.getIndexEntry(rlsMetadata) // Test would fail if it throws corrupt Exception Review Comment: done ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -554,23 +557,221 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } - @Test - def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = { -// create Corrupt Offset Index File -createCorruptRemoteIndexCacheOffsetFile() + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { +// create Corrupted Index File in remote index cache +createCorruptedIndexFile(indexType, cache.cacheDir()) val entry = cache.getIndexEntry(rlsMetadata) // Test would fail if it throws corrupt Exception -val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) val offsetIndexFile = entry.offsetIndex.file().toPath +val txnIndexFile = entry.txnIndex.file().toPath +val timeIndexFile = entry.timeIndex.file().toPath + +val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) +val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata) +val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata) assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString) +assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString) +assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString) + // assert that parent directory for the index files is correct assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString, - s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent") + s"offsetIndex=$offsetIndexFile is created under incorrect parent") +assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString, + s"txnIndex=$txnIndexFile is created under incorrect parent") +assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString, + s"timeIndex=$timeIndexFile is created under incorrect parent") + // file is corrupted it should fetch from remote storage again verifyFetchIndexInvocation(count = 1) } + @Test + def testMultipleIndexEntriesExecutionInCorruptException(): Unit = { +reset(rsm) +when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { +val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) +val indexType = ans.getArgument[IndexType](1) +val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) +val timeIdx = createTimeIndexForSegmentMetadata(metadata) +val txnIdx = createTxIndexForSegmentMetadata(metadata) +maybeAppendIndexEntries(offsetIdx, timeIdx) +// Create corrupted index file +createCorruptTimeIndexOffsetFile(tpDir) +indexType match { + case IndexType.OFFSET => new FileInputStream(offsetIdx.file) + case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) + case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) + case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. + case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. +} + }) + +assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata)) + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) +verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP)) +verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION)) +// Current status +// (cache is null) +// RemoteCacheDir contain +// 1. Offset Index File is fine and not corrupted +// 2. Time Index File is corrupted +// What should be the code flow in next execution +// 1. No rsm call for fetching OffSet Index File. +// 2. Time index file should be fetched from remote
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
kamalcph commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1349871437 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -166,6 +166,11 @@ public Cache internalCache() { return internalCache; } +// Visible for testing +public File cacheDir() { Review Comment: You can find more details about the intention to move the source code to Java in KAFKA-14524 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
showuon commented on PR #14482: URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752364597 @iit2009060 , there are compilation failure, could you fix it? https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14482/10/pipeline/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
kamalcph commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1349857381 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -524,23 +527,221 @@ class RemoteIndexCacheTest { } } - @Test - def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = { -// create Corrupt Offset Index File -createCorruptRemoteIndexCacheOffsetFile() + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { +// create Corrupted Index File in remote index cache +createCorruptedIndexFile(indexType, cache.cacheDir()) val entry = cache.getIndexEntry(rlsMetadata) // Test would fail if it throws corrupt Exception Review Comment: Can we update this comment to? ``` Test would fail it it throws exception other than CorruptIndexException ``` ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -554,23 +557,221 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } - @Test - def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = { -// create Corrupt Offset Index File -createCorruptRemoteIndexCacheOffsetFile() + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { +// create Corrupted Index File in remote index cache +createCorruptedIndexFile(indexType, cache.cacheDir()) val entry = cache.getIndexEntry(rlsMetadata) // Test would fail if it throws corrupt Exception -val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) val offsetIndexFile = entry.offsetIndex.file().toPath +val txnIndexFile = entry.txnIndex.file().toPath +val timeIndexFile = entry.timeIndex.file().toPath + +val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) +val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata) +val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata) assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString) +assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString) +assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString) + // assert that parent directory for the index files is correct assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString, - s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent") + s"offsetIndex=$offsetIndexFile is created under incorrect parent") +assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString, + s"txnIndex=$txnIndexFile is created under incorrect parent") +assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString, + s"timeIndex=$timeIndexFile is created under incorrect parent") + // file is corrupted it should fetch from remote storage again verifyFetchIndexInvocation(count = 1) } + @Test + def testMultipleIndexEntriesExecutionInCorruptException(): Unit = { +reset(rsm) +when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { +val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) +val indexType = ans.getArgument[IndexType](1) +val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) +val timeIdx = createTimeIndexForSegmentMetadata(metadata) +val txnIdx = createTxIndexForSegmentMetadata(metadata) +maybeAppendIndexEntries(offsetIdx, timeIdx) +// Create corrupted index file +createCorruptTimeIndexOffsetFile(tpDir) +indexType match { + case IndexType.OFFSET => new FileInputStream(offsetIdx.file) + case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) + case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) + case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. + case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. +} + }) + +assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata)) + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) +verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP)) +verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION)) +// Current status +// (cache is null) +// RemoteCacheDir contain +// 1. Offset Index File is fine and not corrupted +// 2. Time Index File is corrupted +// What should be the code flow in next
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on PR #14482: URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752350204 @showuon @kamalcph @divijvaidya I have rebased changes and resolved conflict on the changes merged by the PR https://github.com/apache/kafka/pull/14381. Can you please review and merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on PR #14482: URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752348404 > Since we are adding new tests, can we create a new RemoteIndexCacheTest under `storage` module and start writing the tests in `java`? > > The source code is in Java but the test is in scala. We can move the existing tests in a separate PR. @kamalcph Can i create a ticket for same and work on it after PR merge. This has been pending from last week and already approved. > Since we are adding new tests, can we create a new RemoteIndexCacheTest under `storage` module and start writing the tests in `java`? > > The source code is in Java but the test is in scala. We can move the existing tests in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1349861220 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -166,6 +166,11 @@ public Cache internalCache() { return internalCache; } +// Visible for testing +public File cacheDir() { Review Comment: @kamalcph I have just started understanding of the kafka ecosystem , Can you help me understand the rational behind it , Then i can create a ticket with details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
kamalcph commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1349854436 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -166,6 +166,11 @@ public Cache internalCache() { return internalCache; } +// Visible for testing +public File cacheDir() { Review Comment: Can we file a ticket to move the RemoteIndexCacheTest to `storage` module under the same package? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773096#comment-17773096 ] Philip Nee commented on KAFKA-15565: Hey [~sankalpbhatia] thanks for reporting the issue but I'm not sure if this is actually a bug. [~cmccabe] - It seems like you put in the config in 17', could you chime in here? I wonder if the intent was to prevent metadata update. > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config "request.timeout.ms" from the AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on PR #14364: URL: https://github.com/apache/kafka/pull/14364#issuecomment-1752310508 @dajac - Not entirely sure what is the best way to fix the jdk11 build. The rest of the builds seem to be fine with the following failures: ``` Build / JDK 21 and Scala 2.13 / randomClusterPerturbationsShouldConverge[enableRackAwareTaskAssignor=true] – org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest 5s Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 12s Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy – org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest 8s Build / JDK 17 and Scala 2.13 / shouldHonorEOSWhenUsingCachingAndStandbyReplicas – org.apache.kafka.streams.integration.StandbyTaskEOSMultiRebalanceIntegrationTest 2m 7s Build / JDK 8 and Scala 2.12 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest 2m 24s Build / JDK 8 and Scala 2.12 / testAbortTransactionTimeout(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest ``` However, I did open a draft PR from this branch and jdk11 was able to complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] This adds a default implementation to the KafkaProducer interface to … [kafka]
github-actions[bot] commented on PR #13980: URL: https://github.com/apache/kafka/pull/13980#issuecomment-1752300320 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito [kafka]
github-actions[bot] commented on PR #13873: URL: https://github.com/apache/kafka/pull/13873#issuecomment-1752300357 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sankalp Bhatia updated KAFKA-15565: --- Description: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config "request.timeout.ms" from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] was: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config ``request.timeout.ms`` from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config "request.timeout.ms" from the AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sankalp Bhatia updated KAFKA-15565: --- Description: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config request.timeout.ms from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] was: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config /** request.timeout.ms */ from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config request.timeout.ms from the AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sankalp Bhatia updated KAFKA-15565: --- Description: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config ``request.timeout.ms`` from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] was: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config request.timeout.ms from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config ``request.timeout.ms`` from the AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sankalp Bhatia updated KAFKA-15565: --- Description: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config /** request.timeout.ms */ from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] was: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config /** request.timeout.ms */ from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98 > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config /** request.timeout.ms */ from the > AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
Sankalp Bhatia created KAFKA-15565: -- Summary: KafkaAdminClient does not honor request timeout ms Key: KAFKA-15565 URL: https://issues.apache.org/jira/browse/KAFKA-15565 Project: Kafka Issue Type: Bug Reporter: Sankalp Bhatia Assignee: Sankalp Bhatia It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config /** request.timeout.ms */ from the AdminClientConfig[2]. References [1]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521 [2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sankalp Bhatia updated KAFKA-15565: --- Description: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config /** request.timeout.ms */ from the AdminClientConfig[2]. References [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] [2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98 was: It seems to me there is a bug in this line in the KafkaAdminClient. For the constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a hardcoded value of 1 hour. Ideally, this should be derived from the client config /** request.timeout.ms */ from the AdminClientConfig[2]. References [1]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521 [2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98 > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config /** request.timeout.ms */ from the > AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
jeffkbkim commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1349811549 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -106,15 +107,29 @@ void validateOffsetFetch( /** * Returns true if the group is actively subscribed to the topic. * - * @param topic The topic name. + * @param topicThe topic name. + * @param isSubscribedIfEmptySubscriptions Whether to consider an empty topic subscriptions subscribed or not. + * * @return Whether the group is subscribed to the topic. */ -boolean isSubscribedToTopic(String topic); +boolean isSubscribedToTopic(String topic, boolean isSubscribedIfEmptySubscriptions); Review Comment: responded in the thread -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
jeffkbkim commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1349811501 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ +public Optional cleanupExpiredOffsets(String groupId, List records, long offsetsRetentionMs) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return Optional.of(groupId); +} +try { +Group group = groupMetadataManager.group(groupId); +ExpirationCondition expirationCondition = group.expirationCondition(); +Set expiredPartitions = new HashSet<>(); +long currentTimestamp = time.milliseconds(); +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +offsetsByTopic.forEach((topic, partitions) -> { +if (!expirationCondition.subscribedTopics.contains(topic)) { Review Comment: I think the main issue is that the existing behavior in GroupMetadata#removeExpiredOffsets only considers what topics a group is subscribed to if the group is using the consumer group protocol AND is Stable. If a group is in any other state, it acts as if the group is not subscribed to any topic when expiring offsets. here's my concern with the above suggestion: let's say we have an empty group that uses the consumer group protocol. subscribedTopics will be empty as there are no members (set in `computeSubscribedTopics`). This will return `true` from `isSubscribedToTopic`. This is not aligned with the existing behavior which says if a group is empty and has a protocol type, we return an empty collection so that the group is considered not subscribed to any topics during offset expiration. ``` case Some(_) if is(Empty) => // no consumer exists in the group => ... getExpiredOffsets( commitRecordMetadataAndOffset => currentStateTimestamp .getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp), ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1752128811 Got it @mjsax - Sharing the code that seems to be causing this bypass. Currently to fetch any consumer config i.e. `main`, `restore` or `global` we use a common function `getCommonConsumerConfigs` It's within the `getCommonConsumerConfigs` function where we check and override the configs preferred by streams - ```java private Map getCommonConsumerConfigs() { // Fetch all consumer props starting with "consumer." clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames()); // CLean out any properties that were set but need to be controlled by streams checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); // Create a config map of the preferred props and merge it with the cleaned props from above final consumerProps =new (eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); consumerProps.putAll(clientProvidedProps); } ``` And the logic within `getMainConsumerConfigs` is - ```java public Map getMainConsumerConfigs(...) { // Fetch the props starting with "consumer." after cleaning // any props that needed to be overwritten final consumerProps = getCommonConsumerConfigs(); // Get main consumer override props i.e. the ones // starting with "main.consumer." and merge the two maps. final mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); for (final entry: mainConsumerProps.entrySet()) { consumerProps.put(entry.getKey(), entry.getValue()); // Continue processing and filling in other required configs } ``` Do you think I've understood this piece correct? If so should a fix go for this within this PR itself? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15535) Add documentation of "remote.log.index.file.cache.total.size.bytes" configuration property.
[ https://issues.apache.org/jira/browse/KAFKA-15535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772948#comment-17772948 ] hudeqi commented on KAFKA-15535: Is this marked as resolved? The doc of "remote.log.index.file.cache.total.size.bytes" is already available, and other configurations about tiered storage have also been checked, and it seems that nothing is missing. [~satish.duggana] > Add documentation of "remote.log.index.file.cache.total.size.bytes" > configuration property. > > > Key: KAFKA-15535 > URL: https://issues.apache.org/jira/browse/KAFKA-15535 > Project: Kafka > Issue Type: Task > Components: documentation >Reporter: Satish Duggana >Assignee: hudeqi >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Add documentation of "remote.log.index.file.cache.total.size.bytes" > configuration property. > Please double check all the existing public tiered storage configurations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi opened a new pull request, #14511: URL: https://github.com/apache/kafka/pull/14511 As described from: https://github.com/apache/kafka/pull/14243#discussion_r1320630057 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1349654092 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -106,15 +107,29 @@ void validateOffsetFetch( /** * Returns true if the group is actively subscribed to the topic. * - * @param topic The topic name. + * @param topicThe topic name. + * @param isSubscribedIfEmptySubscriptions Whether to consider an empty topic subscriptions subscribed or not. + * * @return Whether the group is subscribed to the topic. */ -boolean isSubscribedToTopic(String topic); +boolean isSubscribedToTopic(String topic, boolean isSubscribedIfEmptySubscriptions); Review Comment: I made a comment about this boolean [here](https://github.com/apache/kafka/pull/14467#discussion_r1348539504). Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653948 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ +public boolean cleanupExpiredOffsets(String groupId, List records) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return true; +} + +// We expect the group to exist. +Group group = groupMetadataManager.group(groupId); +Set expiredPartitions = new HashSet<>(); +long currentTimestamp = time.milliseconds(); +Optional offsetExpirationCondition = group.offsetExpirationCondition(); + +if (!offsetExpirationCondition.isPresent()) { +return false; +} + +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +OffsetExpirationCondition condition = offsetExpirationCondition.get(); + +offsetsByTopic.forEach((topic, partitions) -> { +if (!group.isSubscribedToTopic(topic, false)) { +partitions.forEach((partition, offsetAndMetadata) -> { +if (condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, config.offsetsRetentionMs)) { + expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records)); +} else { +hasAllOffsetsExpired.set(false); +} +}); +} else { +hasAllOffsetsExpired.set(false); +} +}); + +log.debug("[GroupId {}] Expiring offsets: {}", groupId, expiredPartitions); Review Comment: I meant that we need to format the list to a comma separated string. Could we also log the boolean indicating whether all the offsets get expired or not. Should we also say `Expiring offsets of partitions: `? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653647 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +550,76 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ +public boolean cleanupExpiredOffsets(String groupId, List records) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return true; +} + +// We expect the group to exist. +Group group = groupMetadataManager.group(groupId); +Set expiredPartitions = new HashSet<>(); +long currentTimestamp = time.milliseconds(); Review Comment: nit: currentTimestampMs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653472 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +/** + * An offset is considered expired based on different factors, such as the state of the group + * and/or the GroupMetadata record version (for generic groups). This class is used to check + * how offsets for the group should be expired. + */ +public interface OffsetExpirationCondition { + +/** + * Given an offset metadata and offsets retention, return whether the offset is expired or not. + * + * @param offset The offset metadata. + * @param currentTimestamp The current timestamp. + * @param offsetsRetentionMs The offset retention. + * + * @return Whether the offset is considered expired or not. + */ +boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, long offsetsRetentionMs); +} Review Comment: nit: Let's add a new line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653385 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType groupType) { assertEquals(3, numDeleteOffsets); } +@Test +public void testIsExpiredOffset() { +long currentTimestamp = 1000L; +long baseTimestamp = 500L; +OptionalLong expireTimestampMs = OptionalLong.of(1500); +long offsetsRetentionMs = 500L; + +// Current timestamp >= expire timestamp => should expire +assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + +// Current timestamp < expire timestamp => should not expire +currentTimestamp = 499; +assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + +// Expire timestamp does not exist (current version with no per partition retention) +// Current timestamp - base timestamp >= offsets retention => should expire +expireTimestampMs = OptionalLong.empty(); +currentTimestamp = 1000L; +assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + +// Current timestamp - base timestamp < offsets retention => should not expire +currentTimestamp = 999L; +assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); +} + +@Test +public void testCleanupExpiredOffsetsGroupDoesNotExist() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() +.build(); + +List records = new ArrayList<>(); +assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records)); Review Comment: I agree in this case. However, if the group does not exist, https://github.com/apache/kafka/pull/14467/files#diff-a4ad0e0a77c78dde2a841d055dd64e15f7da00b8a2a9e3279d74b718d5c612bbR568 should throw an exception. Should we add a test for this case and rename `testCleanupExpiredOffsetsGroupDoesNotExist` to something like `testCleanupExpiredOffsetsGroupHasNoOffsets`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15536) dynamically resize remoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-15536: --- Affects Version/s: 3.7.0 (was: 3.6.0) > dynamically resize remoteIndexCache > --- > > Key: KAFKA-15536 > URL: https://issues.apache.org/jira/browse/KAFKA-15536 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Luke Chen >Assignee: hudeqi >Priority: Major > > context: > https://github.com/apache/kafka/pull/14243#discussion_r1320630057 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]
satishd merged PR #14381: URL: https://github.com/apache/kafka/pull/14381 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]
satishd commented on PR #14381: URL: https://github.com/apache/kafka/pull/14381#issuecomment-1751951542 There are a few unrelated test failures, merging it to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]
hudeqi commented on PR #14381: URL: https://github.com/apache/kafka/pull/14381#issuecomment-1751943708 > Thanks @hudeqi for addressing the review comments. LGTM. Thanks your review, @satishd please merge it. I have resolved several conflicts. I see that there seems to be another PR that conflicts with this PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15502) Handle large keystores in SslEngineValidator
[ https://issues.apache.org/jira/browse/KAFKA-15502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-15502: -- Affects Version/s: 3.5.1 3.4.1 > Handle large keystores in SslEngineValidator > > > Key: KAFKA-15502 > URL: https://issues.apache.org/jira/browse/KAFKA-15502 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.1, 3.6.0, 3.5.1 >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1 > > > We have observed an issue where inter broker SSL listener is not coming up > for large keystores (size >16K) > 1. Currently validator code doesn't work well with large stores. Right now, > WRAP returns if there is already data in the buffer. But if we need more data > to be wrapped for UNWRAP to succeed, we end up looping forever. > 2. Observed large TLSv3 post handshake messages are not getting read and > causing validator code loop forever. This is observed with JDK17+ > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15502) Handle large keystores in SslEngineValidator
[ https://issues.apache.org/jira/browse/KAFKA-15502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-15502. --- Fix Version/s: 3.4.2 3.5.2 3.7.0 3.6.1 Resolution: Fixed > Handle large keystores in SslEngineValidator > > > Key: KAFKA-15502 > URL: https://issues.apache.org/jira/browse/KAFKA-15502 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1 > > > We have observed an issue where inter broker SSL listener is not coming up > for large keystores (size >16K) > 1. Currently validator code doesn't work well with large stores. Right now, > WRAP returns if there is already data in the buffer. But if we need more data > to be wrapped for UNWRAP to succeed, we end up looping forever. > 2. Observed large TLSv3 post handshake messages are not getting read and > causing validator code loop forever. This is observed with JDK17+ > -- This message was sent by Atlassian Jira (v8.20.10#820010)