jolshan commented on code in PR #16072:
URL: https://github.com/apache/kafka/pull/16072#discussion_r1614071322


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
     assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+    val memberId = ""
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+    val offset = 37
+    val requireStable = true;
+
+    groupMetadataManager.addOwnedPartition(groupPartitionId)
+    val group = new GroupMetadata(groupId, Empty, time)
+    groupMetadataManager.addGroup(group)
+
+    val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+    val offsets = immutable.Map(
+      topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+      validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+    )
+
+    expectAppendMessage(Errors.NONE)
+
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+      commitErrors = Some(errors)
+    }
+
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+    groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+    assertTrue(group.hasOffsets)
+
+    assertEquals(Some(Map(
+      topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+      validTopicIdPartition -> Errors.NONE)
+    ), commitErrors)
+
+    val cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      requireStable,
+      Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+    )
+
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(Errors.NONE),
+      cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+    )
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+    )
+
+    assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+    val memberId = ""
+    val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+    val producerId = 232L
+    val producerEpoch = 0.toShort
+
+    groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+    val group = new GroupMetadata(groupId, Empty, time)
+    groupMetadataManager.addGroup(group)
+
+    val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+    val offsets = immutable.Map(
+      foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+      foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+    )
+
+    val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+    
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+      commitErrors = Some(errors)
+    }
+
+    val verificationGuard = new VerificationGuard()
+
+    groupMetadataManager.storeOffsets(
+      group,
+      memberId,
+      offsetTopicPartition,
+      offsets,
+      callback,
+      producerId,
+      producerEpoch,
+      verificationGuard = Some(verificationGuard)
+    )
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    verify(replicaManager).appendRecords(anyLong(),
+      anyShort(),
+      any(),
+      any(),
+      any[Map[TopicPartition, MemoryRecords]],
+      capturedResponseCallback.capture(),
+      any[Option[ReentrantLock]],
+      any(),
+      any(),
+      any(),
+      ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+    verify(replicaManager).getMagic(any())
+    capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+    assertEquals(Some(Map(
+      foo0 -> Errors.NONE,
+      foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+    )), commitErrors)
+
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = true)

Review Comment:
   Looking at the OffsetCommitHandler, it seems like this should be a fatal 
error? 
https://github.com/apache/kafka/blob/d585a494a4871eaecdddf98f8655b8f30c5bd834/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1336



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
     assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+    val memberId = ""
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+    val offset = 37
+    val requireStable = true;
+
+    groupMetadataManager.addOwnedPartition(groupPartitionId)
+    val group = new GroupMetadata(groupId, Empty, time)
+    groupMetadataManager.addGroup(group)
+
+    val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+    val offsets = immutable.Map(
+      topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+      validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds())
+    )
+
+    expectAppendMessage(Errors.NONE)
+
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+      commitErrors = Some(errors)
+    }
+
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+    groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+    assertTrue(group.hasOffsets)
+
+    assertEquals(Some(Map(
+      topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
+      validTopicIdPartition -> Errors.NONE)
+    ), commitErrors)
+
+    val cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      requireStable,
+      Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+    )
+
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(Errors.NONE),
+      cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+    )
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+    )
+
+    assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): 
Unit = {
+    val memberId = ""
+    val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
+    val producerId = 232L
+    val producerEpoch = 0.toShort
+
+    groupMetadataManager.addOwnedPartition(groupPartitionId)
+
+    val group = new GroupMetadata(groupId, Empty, time)
+    groupMetadataManager.addGroup(group)
+
+    val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+    val offsets = immutable.Map(
+      foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
+      foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), 
time.milliseconds())
+    )
+
+    val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+    
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+      commitErrors = Some(errors)
+    }
+
+    val verificationGuard = new VerificationGuard()
+
+    groupMetadataManager.storeOffsets(
+      group,
+      memberId,
+      offsetTopicPartition,
+      offsets,
+      callback,
+      producerId,
+      producerEpoch,
+      verificationGuard = Some(verificationGuard)
+    )
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    verify(replicaManager).appendRecords(anyLong(),
+      anyShort(),
+      any(),
+      any(),
+      any[Map[TopicPartition, MemoryRecords]],
+      capturedResponseCallback.capture(),
+      any[Option[ReentrantLock]],
+      any(),
+      any(),
+      any(),
+      ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+    verify(replicaManager).getMagic(any())
+    capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+    assertEquals(Some(Map(
+      foo0 -> Errors.NONE,
+      foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
+    )), commitErrors)
+
+    assertTrue(group.hasOffsets)
+    assertTrue(group.allOffsets.isEmpty)
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = true)

Review Comment:
   Looking at the TxnOffsetCommitHandler, it seems like this should be a fatal 
error? 
https://github.com/apache/kafka/blob/d585a494a4871eaecdddf98f8655b8f30c5bd834/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1336



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to