dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495882544



##########
File path: core/src/main/resources/common/message/GroupMetadataResponse.json
##########
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataResponse",
+  "validVersions": "0-3",
+  "fields": [
+    {
+      "name": "protocolType",
+      "versions": "0+",
+      "type": "string"
+    },
+    {
+      "name": "generation",
+      "versions": "0+",
+      "type": "int32"
+    },
+    {
+      "name": "protocol",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "leader",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "members",
+      "versions": "0+",
+      "type": "[]MemberMetadata"
+    },
+    {
+      "name": "currentStateTimestamp",

Review comment:
       I think that we should keep `currentStateTimestamp` before `members` as 
we can't reorder fields.
   
   ```
     private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
       new Field(PROTOCOL_TYPE_KEY, STRING),
       new Field(GENERATION_KEY, INT32),
       new Field(PROTOCOL_KEY, NULLABLE_STRING),
       new Field(LEADER_KEY, NULLABLE_STRING),
       new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
       new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
   ```

##########
File path: core/src/main/resources/common/message/OffsetCommitResponse.json
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "OffsetCommitResponse",
+  "validVersions": "0-3",
+  "fields": [
+    {
+      "name": "offset",
+      "versions": "0+",
+      "type": "int64"
+    },
+    {
+      "name": "metadata",
+      "versions": "0+",
+      "type": "string"
+    },
+    {
+      "name": "timestamp",
+      "versions": "0",
+      "type": "int64"
+    },
+    {
+      "name": "commitTimestamp",
+      "versions": "1+",
+      "type": "int64"
+    },
+    {
+      "name": "expireTimestamp",
+      "versions": "1",
+      "type": "int64"
+    },
+    {
+      "name": "leaderEpoch",

Review comment:
       `leaderEpoch` should be before `metadata`.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1321,22 +1159,15 @@ object GroupMetadataManager {
    */
   def readMessageKey(buffer: ByteBuffer): BaseKey = {
     val version = buffer.getShort
-    val keySchema = schemaForKey(version)
-    val key = keySchema.read(buffer)
 
     if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
       // version 0 and 1 refer to offset
-      val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
-      val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
-      val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
-
-      OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, 
partition)))
-
+      val key = new GenOffsetCommitRequest(new ByteBufferAccessor(buffer), 
version)
+      OffsetKey(version, GroupTopicPartition(key.group(), new 
TopicPartition(key.topic(), key.partition())))

Review comment:
       nit: The parenthesis may not be necessary for all the getters. It is 
worth checking the other cases below as well.

##########
File path: core/src/main/resources/common/message/GroupMetadataRequest.json
##########
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataRequest",

Review comment:
       I believe that your previous naming (*Key and *Value) was actually 
correct. I am sorry if my previous comment was misleading. I was actually 
referring to the overall formatting of the json document which is not inline 
with what we are used to do.
   
   We usually put `type` in the beginning and also format fields differently. 
You can see an example here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json

##########
File path: core/src/main/resources/common/message/GroupMetadataResponse.json
##########
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataResponse",
+  "validVersions": "0-3",
+  "fields": [
+    {
+      "name": "protocolType",
+      "versions": "0+",
+      "type": "string"
+    },
+    {
+      "name": "generation",
+      "versions": "0+",
+      "type": "int32"
+    },
+    {
+      "name": "protocol",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "leader",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "members",
+      "versions": "0+",
+      "type": "[]MemberMetadata"
+    },
+    {
+      "name": "currentStateTimestamp",
+      "versions": "2+",
+      "type": "int64"
+    }
+  ],
+  "commonStructs": [
+    {
+      "name": "MemberMetadata",
+      "versions": "0-3",
+      "fields": [
+        {
+          "name": "memberId",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "clientId",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "clientHost",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "sessionTimeout",
+          "versions": "0+",
+          "type": "int32"
+        },
+        {
+          "name": "subscription",
+          "versions": "0+",
+          "type": "bytes"
+        },
+        {
+          "name": "assignment",
+          "versions": "0+",
+          "type": "bytes"
+        },
+        {
+          "name": "rebalanceTimeout",
+          "versions": "1+",
+          "type": "int32"
+        },
+        {
+          "name": "groupInstanceId",

Review comment:
       `groupInstanceId` should be before `clientId`.

##########
File path: core/src/main/resources/common/message/OffsetCommitResponse.json
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "OffsetCommitResponse",
+  "validVersions": "0-3",
+  "fields": [
+    {
+      "name": "offset",
+      "versions": "0+",
+      "type": "int64"
+    },
+    {
+      "name": "metadata",
+      "versions": "0+",
+      "type": "string"
+    },
+    {
+      "name": "timestamp",
+      "versions": "0",
+      "type": "int64"
+    },
+    {
+      "name": "commitTimestamp",
+      "versions": "1+",
+      "type": "int64"
+    },

Review comment:
       I think that we can combine these two together. The field was only 
renamed in V1. Having `commitTimestamp` with versions `0+` and dropping 
`timestamp` entirely should be fine.

##########
File path: core/src/main/resources/common/message/OffsetCommitRequest.json
##########
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "OffsetCommitRequest",
+  "validVersions": "0-1",
+  "fields": [
+    {
+      "name": "group",
+      "versions": "0+",

Review comment:
       I suggest to fix the versions to `0-1` for all fields. 
`OffsetCommitRequest` and `GroupMetadataRequest` are in the same topic and we 
use the version to differentiate the two so it sounds better to use fix ranges 
here for the time being. The same for `GroupMetadataRequest`.
   
   A meta comment about this. At the moment, the spec only supports ranges such 
as `A-B` or `A+`. In this case, I wonder how we will involve this schema as the 
version `2` is used by the other schema. This is not an issue for now but we 
may need to extend the spec to support discontinuous ranges in the future.

##########
File path: core/src/main/resources/common/message/GroupMetadataResponse.json
##########
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataResponse",
+  "validVersions": "0-3",
+  "fields": [
+    {
+      "name": "protocolType",
+      "versions": "0+",
+      "type": "string"
+    },
+    {
+      "name": "generation",
+      "versions": "0+",
+      "type": "int32"
+    },
+    {
+      "name": "protocol",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "leader",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "members",
+      "versions": "0+",
+      "type": "[]MemberMetadata"
+    },
+    {
+      "name": "currentStateTimestamp",
+      "versions": "2+",
+      "type": "int64"
+    }
+  ],
+  "commonStructs": [
+    {
+      "name": "MemberMetadata",
+      "versions": "0-3",
+      "fields": [
+        {
+          "name": "memberId",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "clientId",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "clientHost",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "sessionTimeout",
+          "versions": "0+",
+          "type": "int32"
+        },
+        {
+          "name": "subscription",
+          "versions": "0+",
+          "type": "bytes"
+        },
+        {
+          "name": "assignment",
+          "versions": "0+",
+          "type": "bytes"
+        },
+        {
+          "name": "rebalanceTimeout",

Review comment:
       It seems that `rebalanceTimeout` should be before `sessionTimeout`.
   
   ```
     private val MEMBER_METADATA_V3 = new Schema(
       new Field(MEMBER_ID_KEY, STRING),
       new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
       new Field(CLIENT_ID_KEY, STRING),
       new Field(CLIENT_HOST_KEY, STRING),
       new Field(REBALANCE_TIMEOUT_KEY, INT32),
       new Field(SESSION_TIMEOUT_KEY, INT32),
       new Field(SUBSCRIPTION_KEY, BYTES),
       new Field(ASSIGNMENT_KEY, BYTES))
   ```

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -997,173 +996,45 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-    new Field("topic", STRING),
-    new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64),
-    new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-    new Field("offset", INT64),
-    new Field("leader_epoch", INT32),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
-  private val MEMBERS_KEY = "members"
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
-
-  // map of versions to key schemas as data types
-  private val MESSAGE_TYPE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_KEY_SCHEMA,
-    1 -> OFFSET_COMMIT_KEY_SCHEMA,
-    2 -> GROUP_METADATA_KEY_SCHEMA)
-
-  // map of version of offset value schemas
-  private val OFFSET_VALUE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
-    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
-    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
-
-  // map of version of group metadata value schemas
-  private val GROUP_VALUE_SCHEMAS = Map(
-    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
-    2 -> GROUP_METADATA_VALUE_SCHEMA_V2,
-    3 -> GROUP_METADATA_VALUE_SCHEMA_V3)
-
-  private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.keySet.max
-
-  private def schemaForKey(version: Int) = {
-    val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown message key schema version " 
+ version)
-    }
-  }
-
-  private def schemaForOffsetValue(version: Int) = {
-    val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown offset schema version " + 
version)
+  /**
+   *
+   * Statically check to make sure that the generated code always stays in 
sync with the overall protocol
+   * @param clz generated class
+   * @param actual actual version
+   * @param expected expected version
+   * @return correct version
+   */
+  private def checkVersionOfGeneratedCode(clz: Class[_], actual: Short, 
expected: Short): Short = {
+    if (actual != expected) {
+      throw new IllegalArgumentException(
+        s"core/src/main/resources/common/message/${clz.getSimpleName}.json 
needs to be updated to match the " +
+          s"latest assignment protocol version. ${clz.getSimpleName} only 
supports up to  ["
+          + actual + "] but needs to support up to [" + expected + "].");
     }
+    expected
   }
 
-  private def schemaForGroupValue(version: Int) = {
-    val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown group metadata version " + 
version)
-    }
+  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = checkVersionOfGeneratedCode(
+    clz = classOf[GenOffsetCommitRequest],
+    actual = (GenOffsetCommitRequest.SCHEMAS.length - 1).toShort,
+    expected = 1.toShort)
+  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = checkVersionOfGeneratedCode(
+    clz = classOf[GroupMetadataRequest],
+    actual = (GroupMetadataRequest.SCHEMAS.length - 1).toShort,
+    expected = 2.toShort)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
checkVersionOfGeneratedCode(
+    clz = classOf[GroupMetadataResponse],
+    actual = (GroupMetadataResponse.SCHEMAS.length - 1).toShort,
+    expected = 3.toShort)
+
+  private def serializeMessage(version: Short, message: Message): Array[Byte] 
= {
+    val cache = new ObjectSerializationCache()
+    val size = message.size(cache, version)
+    val bytes = ByteBuffer.allocate(2 + size)
+    val accessor = new ByteBufferAccessor(bytes)
+    accessor.writeShort(version)
+    message.write(accessor, cache, version)
+    bytes.array()

Review comment:
       We have merged my PR that contains this piece of code as well: 
https://github.com/apache/kafka/commit/466f8fd21c6651ea5daa50154239e85fa629dbb4#diff-bad29ccb1aba700e1badeff62f1a86b7R178.
 I think that we should find a common place where we could put that piece in 
order to avoid having it twice.
   
   `org.apache.kafka.common.protocol.MessageUtil` may be a good place. What do 
you think?

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -997,173 +996,45 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-    new Field("topic", STRING),
-    new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64),
-    new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-    new Field("offset", INT64),
-    new Field("leader_epoch", INT32),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
-  private val MEMBERS_KEY = "members"
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
-
-  // map of versions to key schemas as data types
-  private val MESSAGE_TYPE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_KEY_SCHEMA,
-    1 -> OFFSET_COMMIT_KEY_SCHEMA,
-    2 -> GROUP_METADATA_KEY_SCHEMA)
-
-  // map of version of offset value schemas
-  private val OFFSET_VALUE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
-    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
-    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
-
-  // map of version of group metadata value schemas
-  private val GROUP_VALUE_SCHEMAS = Map(
-    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
-    2 -> GROUP_METADATA_VALUE_SCHEMA_V2,
-    3 -> GROUP_METADATA_VALUE_SCHEMA_V3)
-
-  private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.keySet.max
-
-  private def schemaForKey(version: Int) = {
-    val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown message key schema version " 
+ version)
-    }
-  }
-
-  private def schemaForOffsetValue(version: Int) = {
-    val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown offset schema version " + 
version)
+  /**
+   *
+   * Statically check to make sure that the generated code always stays in 
sync with the overall protocol
+   * @param clz generated class
+   * @param actual actual version
+   * @param expected expected version
+   * @return correct version
+   */
+  private def checkVersionOfGeneratedCode(clz: Class[_], actual: Short, 
expected: Short): Short = {
+    if (actual != expected) {
+      throw new IllegalArgumentException(
+        s"core/src/main/resources/common/message/${clz.getSimpleName}.json 
needs to be updated to match the " +
+          s"latest assignment protocol version. ${clz.getSimpleName} only 
supports up to  ["
+          + actual + "] but needs to support up to [" + expected + "].");
     }
+    expected
   }
 
-  private def schemaForGroupValue(version: Int) = {
-    val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown group metadata version " + 
version)
-    }
+  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = checkVersionOfGeneratedCode(
+    clz = classOf[GenOffsetCommitRequest],
+    actual = (GenOffsetCommitRequest.SCHEMAS.length - 1).toShort,
+    expected = 1.toShort)
+  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = checkVersionOfGeneratedCode(
+    clz = classOf[GroupMetadataRequest],
+    actual = (GroupMetadataRequest.SCHEMAS.length - 1).toShort,
+    expected = 2.toShort)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
checkVersionOfGeneratedCode(

Review comment:
       It seems that we don't check the version of `OffsetCommitResponse`. Is 
it intentional? Also, I wonder if doing these checks is really necessary. 
Having to maintain the versions in the spec and the expected version here 
sounds quite painful to me.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -997,173 +996,45 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-    new Field("topic", STRING),
-    new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64),
-    new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-    new Field("offset", INT64),
-    new Field("leader_epoch", INT32),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
-  private val MEMBERS_KEY = "members"
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
-
-  // map of versions to key schemas as data types
-  private val MESSAGE_TYPE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_KEY_SCHEMA,
-    1 -> OFFSET_COMMIT_KEY_SCHEMA,
-    2 -> GROUP_METADATA_KEY_SCHEMA)
-
-  // map of version of offset value schemas
-  private val OFFSET_VALUE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
-    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
-    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
-
-  // map of version of group metadata value schemas
-  private val GROUP_VALUE_SCHEMAS = Map(
-    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
-    2 -> GROUP_METADATA_VALUE_SCHEMA_V2,
-    3 -> GROUP_METADATA_VALUE_SCHEMA_V3)
-
-  private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.keySet.max
-
-  private def schemaForKey(version: Int) = {
-    val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown message key schema version " 
+ version)
-    }
-  }
-
-  private def schemaForOffsetValue(version: Int) = {
-    val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown offset schema version " + 
version)
+  /**
+   *
+   * Statically check to make sure that the generated code always stays in 
sync with the overall protocol
+   * @param clz generated class
+   * @param actual actual version
+   * @param expected expected version
+   * @return correct version
+   */
+  private def checkVersionOfGeneratedCode(clz: Class[_], actual: Short, 
expected: Short): Short = {
+    if (actual != expected) {
+      throw new IllegalArgumentException(
+        s"core/src/main/resources/common/message/${clz.getSimpleName}.json 
needs to be updated to match the " +
+          s"latest assignment protocol version. ${clz.getSimpleName} only 
supports up to  ["
+          + actual + "] but needs to support up to [" + expected + "].");
     }
+    expected
   }
 
-  private def schemaForGroupValue(version: Int) = {
-    val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown group metadata version " + 
version)
-    }
+  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = checkVersionOfGeneratedCode(
+    clz = classOf[GenOffsetCommitRequest],
+    actual = (GenOffsetCommitRequest.SCHEMAS.length - 1).toShort,

Review comment:
       We can use `GenOffsetCommitRequest. HIGHEST_SUPPORTED_VERSION` here. The 
same applies to the other below.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1174,15 +1045,8 @@ object GroupMetadataManager {
    * @return key for offset commit message
    */
   def offsetCommitKey(groupId: String, topicPartition: TopicPartition): 
Array[Byte] = {
-    val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
-    key.set(OFFSET_KEY_GROUP_FIELD, groupId)
-    key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
-    key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
-
-    val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
-    byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
-    key.writeTo(byteBuffer)
-    byteBuffer.array()
+    serializeMessage(CURRENT_OFFSET_KEY_SCHEMA_VERSION,
+      new 
GenOffsetCommitRequest().setGroup(groupId).setTopic(topicPartition.topic()).setPartition(topicPartition.partition()))

Review comment:
       nit: Could we break this line? Having each setYXZ on a new line would 
improve the readability.

##########
File path: core/src/main/resources/common/message/GroupMetadataResponse.json
##########
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataResponse",
+  "validVersions": "0-3",
+  "fields": [
+    {
+      "name": "protocolType",
+      "versions": "0+",
+      "type": "string"
+    },
+    {
+      "name": "generation",
+      "versions": "0+",
+      "type": "int32"
+    },
+    {
+      "name": "protocol",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "leader",
+      "versions": "0+",
+      "type": "string",
+      "nullableVersions": "0+"
+    },
+    {
+      "name": "members",
+      "versions": "0+",
+      "type": "[]MemberMetadata"
+    },
+    {
+      "name": "currentStateTimestamp",
+      "versions": "2+",
+      "type": "int64"
+    }
+  ],
+  "commonStructs": [
+    {
+      "name": "MemberMetadata",
+      "versions": "0-3",
+      "fields": [
+        {
+          "name": "memberId",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "clientId",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "clientHost",
+          "versions": "0+",
+          "type": "string"
+        },
+        {
+          "name": "sessionTimeout",
+          "versions": "0+",
+          "type": "int32"
+        },
+        {
+          "name": "subscription",
+          "versions": "0+",
+          "type": "bytes"

Review comment:
       We may want to use `"zeroCopy": true` for both `subscription` and 
`assignment` like we did for the consumer protocol.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to