Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


kamalcph commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1349878452


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -134,7 +134,7 @@ public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager,
 .maximumSize(maxSize)
 // removeListener is invoked when either the entry is 
invalidated (means manual removal by the caller) or

Review Comment:
   Can we update the comment to refer the `evictionListener` instead of 
`removeListener`? 



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -169,7 +169,16 @@ public Cache internalCache() {
 public void remove(Uuid key) {
 lock.readLock().lock();
 try {
-internalCache.invalidate(key);
+internalCache.asMap().computeIfPresent(key, (k, v) -> {
+try {
+v.markForCleanup();
+v.cleanup();

Review Comment:
   Also, we have to add that entry to the `expiredIndexes` so that cleanup 
thread will be invoked on the removed entry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


iit2009060 commented on PR #14482:
URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752368980

   > @iit2009060 , there are compilation failure, could you fix it? 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14482/10/pipeline/
   
   on it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1349871792


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -524,23 +527,221 @@ class RemoteIndexCacheTest {
 }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-// create Corrupt Offset Index File
-createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+// create Corrupted Index File in remote index cache
+createCorruptedIndexFile(indexType, cache.cacheDir())
 val entry = cache.getIndexEntry(rlsMetadata)
 // Test would fail if it throws corrupt Exception

Review Comment:
   done



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -554,23 +557,221 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-// create Corrupt Offset Index File
-createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+// create Corrupted Index File in remote index cache
+createCorruptedIndexFile(indexType, cache.cacheDir())
 val entry = cache.getIndexEntry(rlsMetadata)
 // Test would fail if it throws corrupt Exception
-val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
 val offsetIndexFile = entry.offsetIndex.file().toPath
+val txnIndexFile = entry.txnIndex.file().toPath
+val timeIndexFile = entry.timeIndex.file().toPath
+
+val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
 assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
 // assert that parent directory for the index files is correct
 assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-  s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+  s"offsetIndex=$offsetIndexFile is created under incorrect parent")
+assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+  s"txnIndex=$txnIndexFile is created under incorrect parent")
+assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+  s"timeIndex=$timeIndexFile is created under incorrect parent")
+
 // file is corrupted it should fetch from remote storage again
 verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+reset(rsm)
+when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+  .thenAnswer(ans => {
+val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+val indexType = ans.getArgument[IndexType](1)
+val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+val txnIdx = createTxIndexForSegmentMetadata(metadata)
+maybeAppendIndexEntries(offsetIdx, timeIdx)
+// Create corrupted index file
+createCorruptTimeIndexOffsetFile(tpDir)
+indexType match {
+  case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+  case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+  case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+  case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+  case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+}
+  })
+
+assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+// Current status
+// (cache is null)
+// RemoteCacheDir contain
+// 1. Offset Index File is fine and not corrupted
+// 2. Time Index File is corrupted
+// What should be the code flow in next execution
+// 1. No rsm call for fetching OffSet Index File.
+// 2. Time index file should be fetched from remote 

Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


kamalcph commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1349871437


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -166,6 +166,11 @@ public Cache internalCache() {
 return internalCache;
 }
 
+// Visible for testing
+public File cacheDir() {

Review Comment:
   You can find more details about the intention to move the source code to 
Java in KAFKA-14524



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


showuon commented on PR #14482:
URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752364597

   @iit2009060 , there are compilation failure, could you fix it? 
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14482/10/pipeline/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


kamalcph commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1349857381


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -524,23 +527,221 @@ class RemoteIndexCacheTest {
 }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-// create Corrupt Offset Index File
-createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+// create Corrupted Index File in remote index cache
+createCorruptedIndexFile(indexType, cache.cacheDir())
 val entry = cache.getIndexEntry(rlsMetadata)
 // Test would fail if it throws corrupt Exception

Review Comment:
   Can we update this comment to?
   
   ```
   Test would fail it it throws exception other than CorruptIndexException
   ```



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -554,23 +557,221 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-// create Corrupt Offset Index File
-createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+// create Corrupted Index File in remote index cache
+createCorruptedIndexFile(indexType, cache.cacheDir())
 val entry = cache.getIndexEntry(rlsMetadata)
 // Test would fail if it throws corrupt Exception
-val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
 val offsetIndexFile = entry.offsetIndex.file().toPath
+val txnIndexFile = entry.txnIndex.file().toPath
+val timeIndexFile = entry.timeIndex.file().toPath
+
+val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
 assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
 // assert that parent directory for the index files is correct
 assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-  s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+  s"offsetIndex=$offsetIndexFile is created under incorrect parent")
+assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+  s"txnIndex=$txnIndexFile is created under incorrect parent")
+assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+  s"timeIndex=$timeIndexFile is created under incorrect parent")
+
 // file is corrupted it should fetch from remote storage again
 verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+reset(rsm)
+when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+  .thenAnswer(ans => {
+val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+val indexType = ans.getArgument[IndexType](1)
+val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+val txnIdx = createTxIndexForSegmentMetadata(metadata)
+maybeAppendIndexEntries(offsetIdx, timeIdx)
+// Create corrupted index file
+createCorruptTimeIndexOffsetFile(tpDir)
+indexType match {
+  case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+  case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+  case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+  case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+  case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+}
+  })
+
+assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+// Current status
+// (cache is null)
+// RemoteCacheDir contain
+// 1. Offset Index File is fine and not corrupted
+// 2. Time Index File is corrupted
+// What should be the code flow in next 

Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


iit2009060 commented on PR #14482:
URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752350204

   @showuon  @kamalcph  @divijvaidya  I have rebased changes and resolved 
conflict on the changes merged by 
   the PR https://github.com/apache/kafka/pull/14381. 
   Can you please review and merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


iit2009060 commented on PR #14482:
URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752348404

   > Since we are adding new tests, can we create a new RemoteIndexCacheTest 
under `storage` module and start writing the tests in `java`?
   > 
   > The source code is in Java but the test is in scala. We can move the 
existing tests in a separate PR.
   
   @kamalcph  Can i create a ticket for same and work on it after PR merge.  
This has been pending from last week and already approved. 
   
   > Since we are adding new tests, can we create a new RemoteIndexCacheTest 
under `storage` module and start writing the tests in `java`?
   > 
   > The source code is in Java but the test is in scala. We can move the 
existing tests in a separate PR.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1349861220


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -166,6 +166,11 @@ public Cache internalCache() {
 return internalCache;
 }
 
+// Visible for testing
+public File cacheDir() {

Review Comment:
   @kamalcph  I have just started understanding of the kafka ecosystem , Can 
you help me understand the rational behind it , Then i can create a ticket with 
details. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-08 Thread via GitHub


kamalcph commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1349854436


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -166,6 +166,11 @@ public Cache internalCache() {
 return internalCache;
 }
 
+// Visible for testing
+public File cacheDir() {

Review Comment:
   Can we file a ticket to move the RemoteIndexCacheTest to `storage` module 
under the same package? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-08 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773096#comment-17773096
 ] 

Philip Nee commented on KAFKA-15565:


Hey [~sankalpbhatia]  thanks for reporting the issue but I'm not sure if this 
is actually a bug. [~cmccabe] - It seems like you put in the config in 17', 
could you chime in here? I wonder if the intent was to prevent metadata update.

> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     "request.timeout.ms"  from the AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-08 Thread via GitHub


philipnee commented on PR #14364:
URL: https://github.com/apache/kafka/pull/14364#issuecomment-1752310508

   @dajac - Not entirely sure what is the best way to fix the jdk11 build.  The 
rest of the builds seem to be fine with the following failures:
   
   ```
   Build / JDK 21 and Scala 2.13 / 
randomClusterPerturbationsShouldConverge[enableRackAwareTaskAssignor=true] – 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest
   5s
   Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   12s
   Build / JDK 17 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   8s
   Build / JDK 17 and Scala 2.13 / 
shouldHonorEOSWhenUsingCachingAndStandbyReplicas – 
org.apache.kafka.streams.integration.StandbyTaskEOSMultiRebalanceIntegrationTest
   2m 7s
   Build / JDK 8 and Scala 2.12 / testMultiWorkerRestartOnlyConnector – 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
   2m 24s
   Build / JDK 8 and Scala 2.12 / 
testAbortTransactionTimeout(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   ```
   
   However, I did open a draft PR from this branch and jdk11 was able to 
complete. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] This adds a default implementation to the KafkaProducer interface to … [kafka]

2023-10-08 Thread via GitHub


github-actions[bot] commented on PR #13980:
URL: https://github.com/apache/kafka/pull/13980#issuecomment-1752300320

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito [kafka]

2023-10-08 Thread via GitHub


github-actions[bot] commented on PR #13873:
URL: https://github.com/apache/kafka/pull/13873#issuecomment-1752300357

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-08 Thread Sankalp Bhatia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sankalp Bhatia updated KAFKA-15565:
---
Description: 
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     "request.timeout.ms"  from the AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]

  was:
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     ``request.timeout.ms``  from the AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]


> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     "request.timeout.ms"  from the AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-08 Thread Sankalp Bhatia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sankalp Bhatia updated KAFKA-15565:
---
Description: 
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     request.timeout.ms  from the AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]

  was:
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     /** request.timeout.ms */ from the 
AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]


> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     request.timeout.ms  from the AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-08 Thread Sankalp Bhatia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sankalp Bhatia updated KAFKA-15565:
---
Description: 
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     ``request.timeout.ms``  from the AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]

  was:
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     request.timeout.ms  from the AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]


> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     ``request.timeout.ms``  from the AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-08 Thread Sankalp Bhatia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sankalp Bhatia updated KAFKA-15565:
---
Description: 
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     /** request.timeout.ms */ from the 
AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]

  was:
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     /** request.timeout.ms */ from the 
AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98


> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     /** request.timeout.ms */ from the 
> AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-08 Thread Sankalp Bhatia (Jira)
Sankalp Bhatia created KAFKA-15565:
--

 Summary: KafkaAdminClient does not honor request timeout ms 
 Key: KAFKA-15565
 URL: https://issues.apache.org/jira/browse/KAFKA-15565
 Project: Kafka
  Issue Type: Bug
Reporter: Sankalp Bhatia
Assignee: Sankalp Bhatia


It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     /** request.timeout.ms */ from the 
AdminClientConfig[2]. 


References

[1]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521

[2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-08 Thread Sankalp Bhatia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sankalp Bhatia updated KAFKA-15565:
---
Description: 
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     /** request.timeout.ms */ from the 
AdminClientConfig[2]. 

References

[1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]

[2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98

  was:
It seems to me there is a bug in this line in the KafkaAdminClient. For the 
constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
hardcoded value of 1 hour.  Ideally, this should be derived from the client 
config     /** request.timeout.ms */ from the 
AdminClientConfig[2]. 


References

[1]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521

[2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98


> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     /** request.timeout.ms */ from the 
> AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2]https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349811549


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -106,15 +107,29 @@ void validateOffsetFetch(
 /**
  * Returns true if the group is actively subscribed to the topic.
  *
- * @param topic The topic name.
+ * @param topicThe topic name.
+ * @param isSubscribedIfEmptySubscriptions Whether to consider an empty 
topic subscriptions subscribed or not.
+ *
  * @return Whether the group is subscribed to the topic.
  */
-boolean isSubscribedToTopic(String topic);
+boolean isSubscribedToTopic(String topic, boolean 
isSubscribedIfEmptySubscriptions);

Review Comment:
   responded in the thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349811501


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   I think the main issue is that the existing behavior in 
GroupMetadata#removeExpiredOffsets only considers what topics a group is 
subscribed to if the group is using the consumer group protocol AND is Stable. 
If a group is in any other state, it acts as if the group is not subscribed to 
any topic when expiring offsets.
   
   here's my concern with the above suggestion:
   
   let's say we have an empty group that uses the consumer group protocol. 
subscribedTopics will be empty as there are no members (set in 
`computeSubscribedTopics`). This will return `true` from `isSubscribedToTopic`. 
This is not aligned with the existing behavior which says if a group is empty 
and has a protocol type, we return an empty collection so that the group is 
considered not subscribed to any topics during offset expiration.
   ```
 case Some(_) if is(Empty) =>
   // no consumer exists in the group =>
   ...
   getExpiredOffsets(
 commitRecordMetadataAndOffset => currentStateTimestamp
   
.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp), 

   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2023-10-08 Thread via GitHub


ashmeet13 commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-1752128811

   Got it @mjsax - 
   Sharing the code that seems to be causing this bypass. Currently to fetch 
any consumer config i.e. `main`, `restore` or `global` we use a common function 
`getCommonConsumerConfigs`
   
   
   It's within the `getCommonConsumerConfigs` function where we check and 
override the configs preferred by streams - 
   ```java
   private Map getCommonConsumerConfigs() {
   // Fetch all consumer props starting with "consumer."
   clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, 
ConsumerConfig.configNames());
   
   // CLean out any properties that were set but need to be controlled 
by streams
   checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
   checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
   
   // Create a config map of the preferred props and merge it with the 
cleaned props from above  
   final consumerProps =new (eosEnabled ? CONSUMER_EOS_OVERRIDES : 
CONSUMER_DEFAULT_OVERRIDES);
   consumerProps.putAll(clientProvidedProps);
   }
   ``` 
   
   And the logic within `getMainConsumerConfigs` is - 
   ```java
   public Map getMainConsumerConfigs(...) {
   // Fetch the props starting with "consumer." after cleaning
   // any props that needed to be overwritten
   final consumerProps = getCommonConsumerConfigs();
   
   // Get main consumer override props i.e. the ones 
   // starting with "main.consumer." and merge the two maps.
   final mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
   for (final entry: mainConsumerProps.entrySet()) {
   consumerProps.put(entry.getKey(), entry.getValue());
   
   // Continue processing and filling in other required configs
   }
   ```

   Do you think I've understood this piece correct?
   If so should a fix go for this within this PR itself?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15535) Add documentation of "remote.log.index.file.cache.total.size.bytes" configuration property.

2023-10-08 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772948#comment-17772948
 ] 

hudeqi commented on KAFKA-15535:


Is this marked as resolved? The doc of 
"remote.log.index.file.cache.total.size.bytes" is already available, and other 
configurations about tiered storage have also been checked, and it seems that 
nothing is missing. [~satish.duggana] 

> Add documentation of "remote.log.index.file.cache.total.size.bytes" 
> configuration property. 
> 
>
> Key: KAFKA-15535
> URL: https://issues.apache.org/jira/browse/KAFKA-15535
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Satish Duggana
>Assignee: hudeqi
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Add documentation of "remote.log.index.file.cache.total.size.bytes" 
> configuration property. 
> Please double check all the existing public tiered storage configurations. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-08 Thread via GitHub


hudeqi opened a new pull request, #14511:
URL: https://github.com/apache/kafka/pull/14511

   As described from: 
https://github.com/apache/kafka/pull/14243#discussion_r1320630057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349654092


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -106,15 +107,29 @@ void validateOffsetFetch(
 /**
  * Returns true if the group is actively subscribed to the topic.
  *
- * @param topic The topic name.
+ * @param topicThe topic name.
+ * @param isSubscribedIfEmptySubscriptions Whether to consider an empty 
topic subscriptions subscribed or not.
+ *
  * @return Whether the group is subscribed to the topic.
  */
-boolean isSubscribedToTopic(String topic);
+boolean isSubscribedToTopic(String topic, boolean 
isSubscribedIfEmptySubscriptions);

Review Comment:
   I made a comment about this boolean 
[here](https://github.com/apache/kafka/pull/14467#discussion_r1348539504). 
Could you take a look?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653948


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+Optional offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+if (!offsetExpirationCondition.isPresent()) {
+return false;
+}
+
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!group.isSubscribedToTopic(topic, false)) {
+partitions.forEach((partition, offsetAndMetadata) -> {
+if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records));
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+} else {
+hasAllOffsetsExpired.set(false);
+}
+});
+
+log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredPartitions);

Review Comment:
   I meant that we need to format the list to a comma separated string. Could 
we also log the boolean indicating whether all the offsets get expired or not.
   
   Should we also say `Expiring offsets of partitions: `?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +550,76 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+ */
+public boolean cleanupExpiredOffsets(String groupId, List records) 
{
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return true;
+}
+
+// We expect the group to exist.
+Group group = groupMetadataManager.group(groupId);
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();

Review Comment:
   nit: currentTimestampMs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653472


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+/**
+ * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+ *
+ * @param offset   The offset metadata.
+ * @param currentTimestamp The current timestamp.
+ * @param offsetsRetentionMs   The offset retention.
+ *
+ * @return Whether the offset is considered expired or not.
+ */
+boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, 
long offsetsRetentionMs);
+}

Review Comment:
   nit: Let's add a new line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-08 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349653385


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
 assertEquals(3, numDeleteOffsets);
 }
 
+@Test
+public void testIsExpiredOffset() {
+long currentTimestamp = 1000L;
+long baseTimestamp = 500L;
+OptionalLong expireTimestampMs = OptionalLong.of(1500);
+long offsetsRetentionMs = 500L;
+
+// Current timestamp >= expire timestamp => should expire
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp < expire timestamp => should not expire
+currentTimestamp = 499;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Expire timestamp does not exist (current version with no per 
partition retention)
+// Current timestamp - base timestamp >= offsets retention => should 
expire
+expireTimestampMs = OptionalLong.empty();
+currentTimestamp = 1000L;
+assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+// Current timestamp - base timestamp < offsets retention => should 
not expire
+currentTimestamp = 999L;
+assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+}
+
+@Test
+public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+.build();
+
+List records = new ArrayList<>();
+assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));

Review Comment:
   I agree in this case. However, if the group does not exist, 
https://github.com/apache/kafka/pull/14467/files#diff-a4ad0e0a77c78dde2a841d055dd64e15f7da00b8a2a9e3279d74b718d5c612bbR568
 should throw an exception. Should we add a test for this case and rename 
`testCleanupExpiredOffsetsGroupDoesNotExist` to something like 
`testCleanupExpiredOffsetsGroupHasNoOffsets`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15536) dynamically resize remoteIndexCache

2023-10-08 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15536:
---
Affects Version/s: 3.7.0
   (was: 3.6.0)

> dynamically resize remoteIndexCache
> ---
>
> Key: KAFKA-15536
> URL: https://issues.apache.org/jira/browse/KAFKA-15536
> Project: Kafka
>  Issue Type: Improvement
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: hudeqi
>Priority: Major
>
> context:
> https://github.com/apache/kafka/pull/14243#discussion_r1320630057



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-08 Thread via GitHub


satishd merged PR #14381:
URL: https://github.com/apache/kafka/pull/14381


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-08 Thread via GitHub


satishd commented on PR #14381:
URL: https://github.com/apache/kafka/pull/14381#issuecomment-1751951542

   There are a few unrelated test failures, merging it to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14912:Add a dynamic config for remote index cache size [kafka]

2023-10-08 Thread via GitHub


hudeqi commented on PR #14381:
URL: https://github.com/apache/kafka/pull/14381#issuecomment-1751943708

   > Thanks @hudeqi for addressing the review comments. LGTM.
   
   Thanks your review, @satishd please merge it. I have resolved several 
conflicts. I see that there seems to be another PR that conflicts with this PR 
:) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15502) Handle large keystores in SslEngineValidator

2023-10-08 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-15502:
--
Affects Version/s: 3.5.1
   3.4.1

> Handle large keystores in SslEngineValidator
> 
>
> Key: KAFKA-15502
> URL: https://issues.apache.org/jira/browse/KAFKA-15502
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1, 3.6.0, 3.5.1
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> We have observed an issue where inter broker SSL listener is not coming up 
> for large keystores (size >16K)
> 1. Currently validator code doesn't work well with large stores. Right now, 
> WRAP returns if there is already data in the buffer. But if we need more data 
> to be wrapped for UNWRAP to succeed, we end up looping forever.
> 2. Observed large TLSv3 post handshake messages are not getting read and 
> causing validator code loop forever. This is observed with JDK17+
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15502) Handle large keystores in SslEngineValidator

2023-10-08 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-15502.
---
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
   Resolution: Fixed

> Handle large keystores in SslEngineValidator
> 
>
> Key: KAFKA-15502
> URL: https://issues.apache.org/jira/browse/KAFKA-15502
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> We have observed an issue where inter broker SSL listener is not coming up 
> for large keystores (size >16K)
> 1. Currently validator code doesn't work well with large stores. Right now, 
> WRAP returns if there is already data in the buffer. But if we need more data 
> to be wrapped for UNWRAP to succeed, we end up looping forever.
> 2. Observed large TLSv3 post handshake messages are not getting read and 
> causing validator code loop forever. This is observed with JDK17+
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)