ijuma commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r485652591



##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,24 +1401,20 @@ object GroupMetadataManager {
       val version = buffer.getShort
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
+      val maxVersion = GROUP_VALUE_SCHEMAS.size - 1

Review comment:
       We should add a constant in `GroupMetadataManager` for this, maybe call 
it `CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION` for consistency with the other 
two similar fields.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = 
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the 
data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", 
e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {

Review comment:
       Nit: `TS` -> `Timestamp` for clarity. Also, `Version` -> `Versions`.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {

Review comment:
       Is this test related to the change or a gap you identified? It's fine if 
it's the latter, just trying to understand.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = 
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the 
data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", 
e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = 
GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the 
currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(time.milliseconds(), 
deserializedGroupMetadata.currentStateTimestamp.get)

Review comment:
       This fails without the change, right?

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue

Review comment:
       Nit: `unsupportedVersion`.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value

Review comment:
       Nit: `un-supported` -> `unsupported`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to