Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-27 Thread via GitHub


dajac commented on PR #16072:
URL: https://github.com/apache/kafka/pull/16072#issuecomment-2133684583

   Merged to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-27 Thread via GitHub


dajac commented on PR #15536:
URL: https://github.com/apache/kafka/pull/15536#issuecomment-2133676133

   Addressed by https://github.com/apache/kafka/pull/16072.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-27 Thread via GitHub


dajac closed pull request #15536: KAFKA-16371: fix lingering pending commit 
when handling OFFSET_METADATA_TOO_LARGE
URL: https://github.com/apache/kafka/pull/15536


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-27 Thread via GitHub


dajac merged PR #16072:
URL: https://github.com/apache/kafka/pull/16072


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-27 Thread via GitHub


dajac commented on PR #16072:
URL: https://github.com/apache/kafka/pull/16072#issuecomment-213709

   Opened: https://issues.apache.org/jira/browse/KAFKA-16846.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-26 Thread via GitHub


jolshan commented on PR #16072:
URL: https://github.com/apache/kafka/pull/16072#issuecomment-2132557435

   @chia7712 He said he would file it on Monday -- so soon :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-26 Thread via GitHub


chia7712 commented on PR #16072:
URL: https://github.com/apache/kafka/pull/16072#issuecomment-2132395309

   > Discussed a bit with David offline. Given that this error should be fatal 
for the producer (we can do a followup to make the error clearer there), this 
handling makes sense for now. In the future we may want to consider 
implications for allowing the server to commit such offsets, but leave the 
decision to the client for now.
   
   out of curiosity, is there a ticket? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-25 Thread via GitHub


dajac commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614757426


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCommi

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-25 Thread via GitHub


jolshan commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614757904


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCom

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-25 Thread via GitHub


dajac commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614757426


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCommi

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-25 Thread via GitHub


jolshan commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614738949


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCom

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-25 Thread via GitHub


dajac commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614441098


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCommi

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614071322


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCom

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614055528


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCom

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614001355


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCommi

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1613818130


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+val verificationGuard = new VerificationGuard()
+
+groupMetadataManager.storeOffsets(
+  group,
+  memberId,
+  offsetTopicPartition,
+  offsets,
+  callback,
+  producerId,
+  producerEpoch,
+  verificationGuard = Some(verificationGuard)
+)
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+verify(replicaManager).appendRecords(anyLong(),
+  anyShort(),
+  any(),
+  any(),
+  any[Map[TopicPartition, MemoryRecords]],
+  capturedResponseCallback.capture(),
+  any[Option[ReentrantLock]],
+  any(),
+  any(),
+  any(),
+  ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+verify(replicaManager).getMagic(any())
+capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+  new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+assertEquals(Some(Map(
+  foo0 -> Errors.NONE,
+  foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+)), commitErrors)
+
+assertTrue(group.hasOffsets)
+assertTrue(group.allOffsets.isEmpty)
+
+group.completePendingTxnOffsetCom

Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


chia7712 commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1613803124


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None

Review Comment:
   > It helps to differentiate whether the callback was called or not. If we 
use a Map, we don't really know if we received an empty map or if it was not 
called. 
   
   Fair enough if that can help us to debug in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


jolshan commented on PR #16072:
URL: https://github.com/apache/kafka/pull/16072#issuecomment-2130038593

   > Hum... I am not sure about this one. @jolshan Would you know?
   
   There was a lot of refactoring in this path so we may have just missed this. 
It was originally intended for any verification errors we pass through, but I 
think we return before we hit this callback if the verification fails? 
https://github.com/apache/kafka/blob/4f55786a8a86fe228a0b10a2f28529f5128e5d6f/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L927


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


dajac commented on PR #16072:
URL: https://github.com/apache/kafka/pull/16072#issuecomment-2129955822

   > BTW, I notice that createPutCacheCallback has a input argument 
preAppendErrors which is never defined. Is it a unfinished feature or a stuff 
we can remove.
   
   Hum... I am not sure about this one. @jolshan Would you know?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1613744158


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None

Review Comment:
   It helps to differentiate whether the callback was called or not. If we use 
a Map, we don't really know if we received an empty map or if it was not 
called. I am not too opinionated on this though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-24 Thread via GitHub


chia7712 commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1613735230


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertEquals(Some(Map(
+  topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+  validTopicIdPartition -> Errors.NONE)
+), commitErrors)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)
+
+assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+val memberId = ""
+val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+val producerId = 232L
+val producerEpoch = 0.toShort
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+  foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+)
+
+val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None

Review Comment:
   ditto



##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+)
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None

Review Comment:

Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-23 Thread via GitHub


dajac commented on PR #15536:
URL: https://github.com/apache/kafka/pull/15536#issuecomment-2127253314

   It would be great if we could also add a unit test for the transactional 
offset commit path. Something like:
   
   ```
 @Test
 def testTransactionalCommitOffsetWithPartialFailure(): Unit = {
   val memberId = ""
   val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
   val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
   val producerId = 232L
   val producerEpoch = 0.toShort
   
   groupMetadataManager.addOwnedPartition(groupPartitionId)
   
   val group = new GroupMetadata(groupId, Empty, time)
   groupMetadataManager.addGroup(group)
   
   val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
   val offsets = immutable.Map(
 foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
 foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 
1), time.milliseconds())
   )
   
   val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
 ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
   
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
   var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
   
   def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
 commitErrors = Some(errors)
   }
   
   val verificationGuard = new VerificationGuard()
   
   groupMetadataManager.storeOffsets(
 group,
 memberId,
 offsetTopicPartition,
 offsets,
 callback,
 producerId,
 producerEpoch,
 verificationGuard = Some(verificationGuard)
   )
   assertTrue(group.hasOffsets)
   assertTrue(group.allOffsets.isEmpty)
   
   verify(replicaManager).appendRecords(anyLong(),
 anyShort(),
 any(),
 any(),
 any[Map[TopicPartition, MemoryRecords]],
 capturedResponseCallback.capture(),
 any[Option[ReentrantLock]],
 any(),
 any(),
 any(),
 ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
   verify(replicaManager).getMagic(any())
   capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
 new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
   
   assertEquals(Some(Map(
 foo0 -> Errors.NONE,
 foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
   )), commitErrors)
   
   assertTrue(group.hasOffsets)
   assertTrue(group.allOffsets.isEmpty)
   
   group.completePendingTxnOffsetCommit(producerId, isCommit = true)
   assertTrue(group.hasOffsets)
   assertFalse(group.allOffsets.isEmpty)
   assertEquals(offsets.get(foo0), group.offset(foo0.topicPartition))
 }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-05-23 Thread via GitHub


dajac commented on PR #15536:
URL: https://github.com/apache/kafka/pull/15536#issuecomment-2127090825

   @kphelps Are you interested in addressing the small comments? I can merge it 
afterwards.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-04-15 Thread via GitHub


dajac commented on code in PR #15536:
URL: https://github.com/apache/kafka/pull/15536#discussion_r1565639650


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds()))
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertFalse(commitErrors.isEmpty)
+assertEquals(
+  Some(Errors.OFFSET_METADATA_TOO_LARGE),
+  commitErrors.get.get(topicIdPartition)
+)
+assertEquals(
+  Some(Errors.NONE),
+  commitErrors.get.get(validTopicIdPartition)
+)

Review Comment:
   nit: Would it be possible to use `assertEquals(expectedMap, 
commitErrors.get)`? We usually prefer this way because it ensures that the Map 
only contains what we expect.



##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds()))
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertFalse(commitErrors.isEmpty)
+assertEquals(
+  Some(Errors.OFFSET_METADATA_TOO_LARGE),
+  commitErrors.get.get(topicIdPartition)
+)
+assertEquals(
+  Some(Errors.NONE),
+  commitErrors.get.get(validTopicIdPartition)
+)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)

Review Comment:
   nit: Same comment as the previous one.



##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPar

Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-04-12 Thread via GitHub


dajac commented on PR #15536:
URL: https://github.com/apache/kafka/pull/15536#issuecomment-2051891235

   Sorry for the delay on this one. I will review it next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org