ijuma commented on a change in pull request #9202: URL: https://github.com/apache/kafka/pull/9202#discussion_r485652591
########## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ########## @@ -1401,24 +1401,20 @@ object GroupMetadataManager { val version = buffer.getShort val valueSchema = schemaForGroupValue(version) val value = valueSchema.read(buffer) + val maxVersion = GROUP_VALUE_SCHEMAS.size - 1 Review comment: We should add a constant in `GroupMetadataManager` for this, maybe call it `CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION` for consistency with the other two similar fields. ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ########## @@ -931,6 +932,42 @@ class GroupMetadataManagerTest { assertTrue(group.has(memberId)) } + @Test + def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = { + val generation = 1 + val protocol = "range" + val memberId = "memberId" + val unSupportedVersion = Short.MinValue + + // put the un-supported version as the version value + val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) + .value().putShort(unSupportedVersion) + // reset the position to the starting position 0 so that it can read the data in correct order + groupMetadataRecordValue.position(0) + + val e = assertThrows(classOf[KafkaException], + () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time)) + assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage) + } + + @Test + def testCurrentStateTSForAllGroupMetadataVersion(): Unit = { Review comment: Nit: `TS` -> `Timestamp` for clarity. Also, `Version` -> `Versions`. ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ########## @@ -931,6 +932,42 @@ class GroupMetadataManagerTest { assertTrue(group.has(memberId)) } + @Test + def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = { Review comment: Is this test related to the change or a gap you identified? It's fine if it's the latter, just trying to understand. ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ########## @@ -931,6 +932,42 @@ class GroupMetadataManagerTest { assertTrue(group.has(memberId)) } + @Test + def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = { + val generation = 1 + val protocol = "range" + val memberId = "memberId" + val unSupportedVersion = Short.MinValue + + // put the un-supported version as the version value + val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) + .value().putShort(unSupportedVersion) + // reset the position to the starting position 0 so that it can read the data in correct order + groupMetadataRecordValue.position(0) + + val e = assertThrows(classOf[KafkaException], + () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time)) + assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage) + } + + @Test + def testCurrentStateTSForAllGroupMetadataVersion(): Unit = { + val generation = 1 + val protocol = "range" + val memberId = "memberId" + + for (apiVersion <- ApiVersion.allVersions) { + val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion) + + val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time) + // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp + if (apiVersion >= KAFKA_2_1_IV0) + assertEquals(time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.get) Review comment: This fails without the change, right? ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ########## @@ -931,6 +932,42 @@ class GroupMetadataManagerTest { assertTrue(group.has(memberId)) } + @Test + def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = { + val generation = 1 + val protocol = "range" + val memberId = "memberId" + val unSupportedVersion = Short.MinValue Review comment: Nit: `unsupportedVersion`. ########## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ########## @@ -931,6 +932,42 @@ class GroupMetadataManagerTest { assertTrue(group.has(memberId)) } + @Test + def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = { + val generation = 1 + val protocol = "range" + val memberId = "memberId" + val unSupportedVersion = Short.MinValue + + // put the un-supported version as the version value Review comment: Nit: `un-supported` -> `unsupported`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org