dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492663086



##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -997,189 +997,52 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-    new Field("topic", STRING),
-    new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64),
-    new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-    new Field("offset", INT64),
-    new Field("leader_epoch", INT32),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
-  private val MEMBERS_KEY = "members"
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
-
-  // map of versions to key schemas as data types
-  private val MESSAGE_TYPE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_KEY_SCHEMA,
-    1 -> OFFSET_COMMIT_KEY_SCHEMA,
-    2 -> GROUP_METADATA_KEY_SCHEMA)
-
-  // map of version of offset value schemas
-  private val OFFSET_VALUE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
-    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
-    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
-
-  // map of version of group metadata value schemas
-  private val GROUP_VALUE_SCHEMAS = Map(
-    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
-    2 -> GROUP_METADATA_VALUE_SCHEMA_V2,
-    3 -> GROUP_METADATA_VALUE_SCHEMA_V3)
-
-  private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.keySet.max
-
-  private def schemaForKey(version: Int) = {
-    val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown message key schema version " 
+ version)
-    }
-  }
-
-  private def schemaForOffsetValue(version: Int) = {
-    val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown offset schema version " + 
version)
+  /**
+   *
+   * Statically check to make sure that the generated code always stays in 
sync with the overall protocol
+   * @param clz generated class
+   * @param actual actual version
+   * @param expected expected version
+   * @return correct version
+   */
+  private def checkVersionOfGeneratedCode(clz: Class[_], actual: Short, 
expected: Short): Short = {
+    if (actual != expected) {
+      throw new IllegalArgumentException(
+        s"core/src/main/resources/common/message/${clz.getSimpleName}.json 
needs to be updated to match the " +
+          s"latest assignment protocol version. ${clz.getSimpleName} only 
supports up to  ["
+          + actual + "] but needs to support up to [" + expected + "].");
     }
+    expected
   }
 
-  private def schemaForGroupValue(version: Int) = {
-    val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown group metadata version " + 
version)
-    }
-  }
+  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = checkVersionOfGeneratedCode(
+    clz = classOf[OffsetCommitKey],
+    actual = (OffsetCommitKey.SCHEMAS.length - 1).toShort,
+    expected = 1.toShort)
+  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = checkVersionOfGeneratedCode(
+    clz = classOf[generated.GroupMetadataKey],
+    actual = (generated.GroupMetadataKey.SCHEMAS.length -1).toShort,

Review comment:
       nit: Missing space after `-`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to