Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #16072: URL: https://github.com/apache/kafka/pull/16072#issuecomment-2133684583 Merged to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #15536: URL: https://github.com/apache/kafka/pull/15536#issuecomment-2133676133 Addressed by https://github.com/apache/kafka/pull/16072. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac closed pull request #15536: KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE URL: https://github.com/apache/kafka/pull/15536 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac merged PR #16072: URL: https://github.com/apache/kafka/pull/16072 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #16072: URL: https://github.com/apache/kafka/pull/16072#issuecomment-213709 Opened: https://issues.apache.org/jira/browse/KAFKA-16846. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
jolshan commented on PR #16072: URL: https://github.com/apache/kafka/pull/16072#issuecomment-2132557435 @chia7712 He said he would file it on Monday -- so soon :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
chia7712 commented on PR #16072: URL: https://github.com/apache/kafka/pull/16072#issuecomment-2132395309 > Discussed a bit with David offline. Given that this error should be fatal for the producer (we can do a followup to make the error clearer there), this handling makes sense for now. In the future we may want to consider implications for allowing the server to commit such offsets, but leave the decision to the client for now. out of curiosity, is there a ticket? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614757426 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCommi
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
jolshan commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614757904 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCom
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614757426 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCommi
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
jolshan commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614738949 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCom
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614441098 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCommi
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
jolshan commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614071322 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCom
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
jolshan commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614055528 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCom
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1614001355 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCommi
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
jolshan commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1613818130 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +val verificationGuard = new VerificationGuard() + +groupMetadataManager.storeOffsets( + group, + memberId, + offsetTopicPartition, + offsets, + callback, + producerId, + producerEpoch, + verificationGuard = Some(verificationGuard) +) +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +verify(replicaManager).appendRecords(anyLong(), + anyShort(), + any(), + any(), + any[Map[TopicPartition, MemoryRecords]], + capturedResponseCallback.capture(), + any[Option[ReentrantLock]], + any(), + any(), + any(), + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) +verify(replicaManager).getMagic(any()) +capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + +assertEquals(Some(Map( + foo0 -> Errors.NONE, + foo1 -> Errors.OFFSET_METADATA_TOO_LARGE +)), commitErrors) + +assertTrue(group.hasOffsets) +assertTrue(group.allOffsets.isEmpty) + +group.completePendingTxnOffsetCom
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
chia7712 commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1613803124 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None Review Comment: > It helps to differentiate whether the callback was called or not. If we use a Map, we don't really know if we received an empty map or if it was not called. Fair enough if that can help us to debug in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
jolshan commented on PR #16072: URL: https://github.com/apache/kafka/pull/16072#issuecomment-2130038593 > Hum... I am not sure about this one. @jolshan Would you know? There was a lot of refactoring in this path so we may have just missed this. It was originally intended for any verification errors we pass through, but I think we return before we hit this callback if the verification fails? https://github.com/apache/kafka/blob/4f55786a8a86fe228a0b10a2f28529f5128e5d6f/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L927 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #16072: URL: https://github.com/apache/kafka/pull/16072#issuecomment-2129955822 > BTW, I notice that createPutCacheCallback has a input argument preAppendErrors which is never defined. Is it a unfinished feature or a stuff we can remove. Hum... I am not sure about this one. @jolshan Would you know? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1613744158 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None Review Comment: It helps to differentiate whether the callback was called or not. If we use a Map, we don't really know if we received an empty map or if it was not called. I am not too opinionated on this though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
chia7712 commented on code in PR #16072: URL: https://github.com/apache/kafka/pull/16072#discussion_r1613735230 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertEquals(Some(Map( + topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE, + validTopicIdPartition -> Errors.NONE) +), commitErrors) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) + +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) + +assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") +val producerId = 232L +val producerEpoch = 0.toShort + +groupMetadataManager.addOwnedPartition(groupPartitionId) + +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), + foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) +) + +val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None Review Comment: ditto ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()) +) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None Review Comment:
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #15536: URL: https://github.com/apache/kafka/pull/15536#issuecomment-2127253314 It would be great if we could also add a unit test for the transactional offset commit path. Something like: ``` @Test def testTransactionalCommitOffsetWithPartialFailure(): Unit = { val memberId = "" val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") val producerId = 232L val producerEpoch = 0.toShort groupMetadataManager.addOwnedPartition(groupPartitionId) val group = new GroupMetadata(groupId, Empty, time) groupMetadataManager.addGroup(group) val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) val offsets = immutable.Map( foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) ) val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { commitErrors = Some(errors) } val verificationGuard = new VerificationGuard() groupMetadataManager.storeOffsets( group, memberId, offsetTopicPartition, offsets, callback, producerId, producerEpoch, verificationGuard = Some(verificationGuard) ) assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) verify(replicaManager).appendRecords(anyLong(), anyShort(), any(), any(), any[Map[TopicPartition, MemoryRecords]], capturedResponseCallback.capture(), any[Option[ReentrantLock]], any(), any(), any(), ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) verify(replicaManager).getMagic(any()) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) assertEquals(Some(Map( foo0 -> Errors.NONE, foo1 -> Errors.OFFSET_METADATA_TOO_LARGE )), commitErrors) assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) group.completePendingTxnOffsetCommit(producerId, isCommit = true) assertTrue(group.hasOffsets) assertFalse(group.allOffsets.isEmpty) assertEquals(offsets.get(foo0), group.offset(foo0.topicPartition)) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #15536: URL: https://github.com/apache/kafka/pull/15536#issuecomment-2127090825 @kphelps Are you interested in addressing the small comments? I can merge it afterwards. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on code in PR #15536: URL: https://github.com/apache/kafka/pull/15536#discussion_r1565639650 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertFalse(commitErrors.isEmpty) +assertEquals( + Some(Errors.OFFSET_METADATA_TOO_LARGE), + commitErrors.get.get(topicIdPartition) +) +assertEquals( + Some(Errors.NONE), + commitErrors.get.get(validTopicIdPartition) +) Review Comment: nit: Would it be possible to use `assertEquals(expectedMap, commitErrors.get)`? We usually prefer this way because it ensures that the Map only contains what we expect. ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertFalse(commitErrors.isEmpty) +assertEquals( + Some(Errors.OFFSET_METADATA_TOO_LARGE), + commitErrors.get.get(topicIdPartition) +) +assertEquals( + Some(Errors.NONE), + commitErrors.get.get(validTopicIdPartition) +) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) Review Comment: nit: Same comment as the previous one. ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPar
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #15536: URL: https://github.com/apache/kafka/pull/15536#issuecomment-2051891235 Sorry for the delay on this one. I will review it next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org