dajac commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614441098
########## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ########## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { + val memberId = "" + val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") + val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") + val offset = 37 + val requireStable = true; + + groupMetadataManager.addOwnedPartition(groupPartitionId) + val group = new GroupMetadata(groupId, Empty, time) + groupMetadataManager.addGroup(group) + + val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) + val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) + ) + + expectAppendMessage(Errors.NONE) + + var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) + } + + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) + assertTrue(group.hasOffsets) + + assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) + ), commitErrors) + + val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) + ) + + assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) + ) + assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) + ) + assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) + ) + + assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { + val memberId = "" + val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") + val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") + val producerId = 232L + val producerEpoch = 0.toShort + + groupMetadataManager.addOwnedPartition(groupPartitionId) + + val group = new GroupMetadata(groupId, Empty, time) + groupMetadataManager.addGroup(group) + + val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) + val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) + ) + + val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + + def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) + } + + val verificationGuard = new VerificationGuard() + + groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) + ) + assertTrue(group.hasOffsets) + assertTrue(group.allOffsets.isEmpty) + + verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) + verify(replicaManager).getMagic(any()) + capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + + assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE + )), commitErrors) + + assertTrue(group.hasOffsets) + assertTrue(group.allOffsets.isEmpty) + + group.completePendingTxnOffsetCommit(producerId, isCommit = true) Review Comment: You brought a good point. The server returns INVALID_OFFSET to the client and the java client does consider it as a fatal error. Therefore, the transaction won’t be committed in the end. Hum… It does not explain the situation that I was investigating then. Except if another client was used. I have another theory that I must validate. I will keep you posted. That being said, this is still a bug that leaves the state on the server inconsistent. The patch is still valid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org