Repository: kafka
Updated Branches:
  refs/heads/1.0 8a175b0ee -> e409f847f


MINOR: Factor out some common group/transactional fields in request objects

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: tedyu <yuzhih...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #4047 from hachikuji/factor-out-some-common-fields


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e409f847
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e409f847
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e409f847

Branch: refs/heads/1.0
Commit: e409f847f549f4fe10c552110366c646c1e44138
Parents: 8a175b0
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Oct 9 13:03:20 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Oct 10 12:49:21 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/protocol/CommonFields.java     | 13 +++++-
 .../kafka/common/protocol/types/Struct.java     | 14 ++++++
 .../common/requests/AddOffsetsToTxnRequest.java | 37 +++++++---------
 .../requests/AddPartitionsToTxnRequest.java     | 27 ++++++------
 .../common/requests/DescribeGroupsResponse.java | 16 +++----
 .../kafka/common/requests/EndTxnRequest.java    | 27 ++++++------
 .../kafka/common/requests/HeartbeatRequest.java | 28 ++++++------
 .../common/requests/InitProducerIdResponse.java | 22 ++++------
 .../kafka/common/requests/JoinGroupRequest.java | 20 ++++-----
 .../common/requests/JoinGroupResponse.java      | 27 ++++++------
 .../common/requests/LeaveGroupRequest.java      | 19 ++++-----
 .../common/requests/ListGroupsResponse.java     |  8 ++--
 .../common/requests/OffsetCommitRequest.java    | 45 ++++++++------------
 .../common/requests/OffsetFetchRequest.java     | 11 +++--
 .../kafka/common/requests/SyncGroupRequest.java | 32 +++++++-------
 .../common/requests/TxnOffsetCommitRequest.java | 34 +++++++--------
 16 files changed, 182 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index e12cde4..472a791 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -27,7 +27,18 @@ public class CommonFields {
     public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", 
"Response error code");
     public static final Field.NullableStr ERROR_MESSAGE = new 
Field.NullableStr("error_message", "Response error message");
 
-    // ACL Apis
+    // Group APIs
+    public static final Field.Str GROUP_ID = new Field.Str("group_id", "The 
unique group identifier");
+    public static final Field.Int32 GENERATION_ID = new 
Field.Int32("generation_id", "The generation of the group.");
+    public static final Field.Str MEMBER_ID = new Field.Str("member_id", "The 
member id assigned by the group " +
+            "coordinator or null if joining for the first time.");
+
+    // Transactional APIs
+    public static final Field.Str TRANSACTIONAL_ID = new 
Field.Str("transactional_id", "The transactional id corresponding to the 
transaction.");
+    public static final Field.Int64 PRODUCER_ID = new 
Field.Int64("producer_id", "Current producer id in use by the transactional 
id.");
+    public static final Field.Int16 PRODUCER_EPOCH = new 
Field.Int16("producer_epoch", "Current epoch associated with the producer id.");
+
+    // ACL APIs
     public static final Field.Int8 RESOURCE_TYPE = new 
Field.Int8("resource_type", "The resource type");
     public static final Field.Str RESOURCE_NAME = new 
Field.Str("resource_name", "The resource name");
     public static final Field.NullableStr RESOURCE_NAME_FILTER = new 
Field.NullableStr("resource_name", "The resource name filter");

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 1cbbcb3..b825201 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -83,6 +83,10 @@ public class Struct {
         return getInt(field.name);
     }
 
+    public Long get(Field.Int64 field) {
+        return getLong(field.name);
+    }
+
     public Short get(Field.Int16 field) {
         return getShort(field.name);
     }
@@ -113,6 +117,12 @@ public class Struct {
         return alternative;
     }
 
+    public String getOrElse(Field.Str field, String alternative) {
+        if (hasField(field.name))
+            return getString(field.name);
+        return alternative;
+    }
+
     /**
      * Get the record value for the field with the given name by doing a hash 
table lookup (slower!)
      *
@@ -270,6 +280,10 @@ public class Struct {
         return set(def.name, value);
     }
 
+    public Struct set(Field.Int64 def, long value) {
+        return set(def.name, value);
+    }
+
     public Struct set(Field.Int16 def, short value) {
         return set(def.name, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index e3e4d79..f6a1722 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -18,27 +18,22 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
 
 public class AddOffsetsToTxnRequest extends AbstractRequest {
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
-    private static final String CONSUMER_GROUP_ID_KEY_NAME = 
"consumer_group_id";
-
     private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
-            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id 
corresponding to the transaction."),
-            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use 
by the transactional id."),
-            new Field(EPOCH_KEY_NAME, INT16, "Current epoch associated with 
the producer id."),
-            new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Consumer group id 
whose offsets should be included in the transaction."));
+            TRANSACTIONAL_ID,
+            PRODUCER_ID,
+            PRODUCER_EPOCH,
+            GROUP_ID);
 
     public static Schema[] schemaVersions() {
         return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0};
@@ -95,10 +90,10 @@ public class AddOffsetsToTxnRequest extends AbstractRequest 
{
 
     public AddOffsetsToTxnRequest(Struct struct, short version) {
         super(version);
-        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
-        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
-        this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
+        this.transactionalId = struct.get(TRANSACTIONAL_ID);
+        this.producerId = struct.get(PRODUCER_ID);
+        this.producerEpoch = struct.get(PRODUCER_EPOCH);
+        this.consumerGroupId = struct.get(GROUP_ID);
     }
 
     public String transactionalId() {
@@ -120,10 +115,10 @@ public class AddOffsetsToTxnRequest extends 
AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(PRODUCER_ID_KEY_NAME, producerId);
-        struct.set(EPOCH_KEY_NAME, producerEpoch);
-        struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
+        struct.set(TRANSACTIONAL_ID, transactionalId);
+        struct.set(PRODUCER_ID, producerId);
+        struct.set(PRODUCER_EPOCH, producerEpoch);
+        struct.set(GROUP_ID, consumerGroupId);
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index c195e24..0ca32be 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -31,23 +31,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TOPICS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id 
corresponding to the transaction."),
-            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use 
by the transactional id."),
-            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch 
associated with the producer id."),
+            TRANSACTIONAL_ID,
+            PRODUCER_ID,
+            PRODUCER_EPOCH,
             new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
                     TOPIC_NAME,
                     new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
@@ -109,9 +106,9 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
 
     public AddPartitionsToTxnRequest(Struct struct, short version) {
         super(version);
-        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
-        this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
+        this.transactionalId = struct.get(TRANSACTIONAL_ID);
+        this.producerId = struct.get(PRODUCER_ID);
+        this.producerEpoch = struct.get(PRODUCER_EPOCH);
 
         List<TopicPartition> partitions = new ArrayList<>();
         Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
@@ -144,9 +141,9 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(PRODUCER_ID_KEY_NAME, producerId);
-        struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
+        struct.set(TRANSACTIONAL_ID, transactionalId);
+        struct.set(PRODUCER_ID, producerId);
+        struct.set(PRODUCER_EPOCH, producerEpoch);
 
         Map<String, List<Integer>> mappedPartitions = 
CollectionUtils.groupDataByTopic(partitions);
         Object[] partitionsArray = new Object[mappedPartitions.size()];

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 61c5a36..8dd6a17 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
@@ -39,20 +41,18 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
 
     private static final String GROUPS_KEY_NAME = "groups";
 
-    private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GROUP_STATE_KEY_NAME = "state";
     private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
     private static final String PROTOCOL_KEY_NAME = "protocol";
 
     private static final String MEMBERS_KEY_NAME = "members";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
     private static final String CLIENT_ID_KEY_NAME = "client_id";
     private static final String CLIENT_HOST_KEY_NAME = "client_host";
     private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
     private static final String MEMBER_ASSIGNMENT_KEY_NAME = 
"member_assignment";
 
     private static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new 
Schema(
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The memberId assigned by 
the coordinator"),
+            MEMBER_ID,
             new Field(CLIENT_ID_KEY_NAME, STRING, "The client id used in the 
member's latest join group request"),
             new Field(CLIENT_HOST_KEY_NAME, STRING, "The client host used in 
the request session corresponding to the " +
                     "member's join group."),
@@ -63,7 +63,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
 
     private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = 
new Schema(
             ERROR_CODE,
-            new Field(GROUP_ID_KEY_NAME, STRING),
+            GROUP_ID,
             new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the 
group (one of: Dead, Stable, AwaitingSync, " +
                     "PreparingRebalance, or empty if there is no active 
group)"),
             new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group 
protocol type (will be empty if there is no active group)"),
@@ -112,7 +112,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
 
-            String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+            String groupId = groupStruct.get(GROUP_ID);
             Errors error = Errors.forCode(groupStruct.get(ERROR_CODE));
             String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
             String protocolType = 
groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
@@ -121,7 +121,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
             List<GroupMember> members = new ArrayList<>();
             for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) {
                 Struct memberStruct = (Struct) memberObj;
-                String memberId = memberStruct.getString(MEMBER_ID_KEY_NAME);
+                String memberId = memberStruct.get(MEMBER_ID);
                 String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME);
                 String clientHost = 
memberStruct.getString(CLIENT_HOST_KEY_NAME);
                 ByteBuffer memberMetadata = 
memberStruct.getBytes(MEMBER_METADATA_KEY_NAME);
@@ -259,7 +259,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
         for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
             Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
             GroupMetadata group = groupEntry.getValue();
-            groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
+            groupStruct.set(GROUP_ID, groupEntry.getKey());
             groupStruct.set(ERROR_CODE, group.error.code());
             groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
             groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
@@ -267,7 +267,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
             List<Struct> membersList = new ArrayList<>();
             for (GroupMember member : group.members) {
                 Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
-                memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId);
+                memberStruct.set(MEMBER_ID, member.memberId);
                 memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
                 memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
                 memberStruct.set(MEMBER_METADATA_KEY_NAME, 
member.memberMetadata);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index 243e9f5..9118d6a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -24,21 +24,18 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
 import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class EndTxnRequest extends AbstractRequest {
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TRANSACTION_RESULT_KEY_NAME = 
"transaction_result";
 
     private static final Schema END_TXN_REQUEST_V0 = new Schema(
-            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id 
corresponding to the transaction."),
-            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use 
by the transactional id."),
-            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch 
associated with the producer id."),
+            TRANSACTIONAL_ID,
+            PRODUCER_ID,
+            PRODUCER_EPOCH,
             new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the 
transaction (0 = ABORT, 1 = COMMIT)"));
 
     public static Schema[] schemaVersions() {
@@ -96,9 +93,9 @@ public class EndTxnRequest extends AbstractRequest {
 
     public EndTxnRequest(Struct struct, short version) {
         super(version);
-        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
-        this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
+        this.transactionalId = struct.get(TRANSACTIONAL_ID);
+        this.producerId = struct.get(PRODUCER_ID);
+        this.producerEpoch = struct.get(PRODUCER_EPOCH);
         this.result = 
TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
     }
 
@@ -121,9 +118,9 @@ public class EndTxnRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(PRODUCER_ID_KEY_NAME, producerId);
-        struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
+        struct.set(TRANSACTIONAL_ID, transactionalId);
+        struct.set(PRODUCER_ID, producerId);
+        struct.set(PRODUCER_EPOCH, producerEpoch);
         struct.set(TRANSACTION_RESULT_KEY_NAME, result.id);
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 00a806f..7d84918 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -18,24 +18,20 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 
 public class HeartbeatRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String GROUP_GENERATION_ID_KEY_NAME = 
"group_generation_id";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
-
     private static final Schema HEARTBEAT_REQUEST_V0 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
-            new Field(GROUP_GENERATION_ID_KEY_NAME, INT32, "The generation of 
the group."),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by 
the group coordinator."));
+            GROUP_ID,
+            GENERATION_ID,
+            MEMBER_ID);
 
     /* v1 request is the same as v0. Throttle time has been added to response 
*/
     private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
@@ -86,9 +82,9 @@ public class HeartbeatRequest extends AbstractRequest {
 
     public HeartbeatRequest(Struct struct, short version) {
         super(version);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
-        memberId = struct.getString(MEMBER_ID_KEY_NAME);
+        groupId = struct.get(GROUP_ID);
+        groupGenerationId = struct.get(GENERATION_ID);
+        memberId = struct.get(MEMBER_ID);
     }
 
     @Override
@@ -124,9 +120,9 @@ public class HeartbeatRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.HEARTBEAT.requestSchema(version()));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(GROUP_ID, groupId);
+        struct.set(GENERATION_ID, groupGenerationId);
+        struct.set(MEMBER_ID, memberId);
         return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 9ecb21f..7a988ca 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.RecordBatch;
@@ -27,9 +26,9 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
 
 public class InitProducerIdResponse extends AbstractResponse {
     // Possible error codes:
@@ -39,16 +38,11 @@ public class InitProducerIdResponse extends 
AbstractResponse {
     //   TransactionalIdAuthorizationFailed
     //   ClusterAuthorizationFailed
 
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
-
     private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
             ERROR_CODE,
-            new Field(PRODUCER_ID_KEY_NAME, INT64, "The producer id for the 
input transactional id. If the input " +
-                    "id was empty, then this is used only for ensuring 
idempotence of messages."),
-            new Field(EPOCH_KEY_NAME, INT16, "The epoch for the producer id. 
Will always be 0 if no transactional " +
-                    "id was specified in the request."));
+            PRODUCER_ID,
+            PRODUCER_EPOCH);
 
     public static Schema[] schemaVersions() {
         return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0};
@@ -69,8 +63,8 @@ public class InitProducerIdResponse extends AbstractResponse {
     public InitProducerIdResponse(Struct struct) {
         this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         this.error = Errors.forCode(struct.get(ERROR_CODE));
-        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
-        this.epoch = struct.getShort(EPOCH_KEY_NAME);
+        this.producerId = struct.get(PRODUCER_ID);
+        this.epoch = struct.get(PRODUCER_EPOCH);
     }
 
     public InitProducerIdResponse(int throttleTimeMs, Errors errors) {
@@ -102,8 +96,8 @@ public class InitProducerIdResponse extends AbstractResponse 
{
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
         struct.set(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.set(PRODUCER_ID_KEY_NAME, producerId);
-        struct.set(EPOCH_KEY_NAME, epoch);
+        struct.set(PRODUCER_ID, producerId);
+        struct.set(PRODUCER_EPOCH, epoch);
         struct.set(ERROR_CODE, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index b2ff133..a7b62a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -29,15 +29,15 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class JoinGroupRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
     private static final String REBALANCE_TIMEOUT_KEY_NAME = 
"rebalance_timeout";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
     private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
     private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols";
     private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name";
@@ -49,21 +49,21 @@ public class JoinGroupRequest extends AbstractRequest {
             new Field(PROTOCOL_METADATA_KEY_NAME, BYTES));
 
     private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            GROUP_ID,
             new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator 
considers the consumer dead if it receives " +
                     "no heartbeat after this timeout in ms."),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or 
an empty string for a new consumer."),
+            MEMBER_ID,
             new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class 
of protocols implemented by group"),
             new Field(GROUP_PROTOCOLS_KEY_NAME, new 
ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
                     "that the member supports"));
 
     private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            GROUP_ID,
             new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator 
considers the consumer dead if it receives no " +
                     "heartbeat after this timeout in ms."),
             new Field(REBALANCE_TIMEOUT_KEY_NAME, INT32, "The maximum time 
that the coordinator will wait for each " +
                     "member to rejoin when rebalancing the group"),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or 
an empty string for a new consumer."),
+            MEMBER_ID,
             new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class 
of protocols implemented by group"),
             new Field(GROUP_PROTOCOLS_KEY_NAME, new 
ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
                     "that the member supports"));
@@ -166,7 +166,7 @@ public class JoinGroupRequest extends AbstractRequest {
     public JoinGroupRequest(Struct struct, short versionId) {
         super(versionId);
 
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        groupId = struct.get(GROUP_ID);
         sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
 
         if (struct.hasField(REBALANCE_TIMEOUT_KEY_NAME))
@@ -176,7 +176,7 @@ public class JoinGroupRequest extends AbstractRequest {
             // v0 had no rebalance timeout but used session timeout implicitly
             rebalanceTimeout = sessionTimeout;
 
-        memberId = struct.getString(MEMBER_ID_KEY_NAME);
+        memberId = struct.get(MEMBER_ID);
         protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME);
 
         groupProtocols = new ArrayList<>();
@@ -249,12 +249,12 @@ public class JoinGroupRequest extends AbstractRequest {
     protected Struct toStruct() {
         short version = version();
         Struct struct = new Struct(ApiKeys.JOIN_GROUP.requestSchema(version));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GROUP_ID, groupId);
         struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
         if (version >= 1) {
             struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout);
         }
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(MEMBER_ID, memberId);
         struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
         List<Struct> groupProtocolsList = new 
ArrayList<>(groupProtocols.size());
         for (ProtocolMetadata protocol : groupProtocols) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 56491eb..7f5ad3f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -30,31 +30,30 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class JoinGroupResponse extends AbstractResponse {
 
-    private static final String GENERATION_ID_KEY_NAME = "generation_id";
     private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol";
     private static final String LEADER_ID_KEY_NAME = "leader_id";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
     private static final String MEMBERS_KEY_NAME = "members";
 
     private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
 
     private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(
-            new Field(MEMBER_ID_KEY_NAME, STRING),
+            MEMBER_ID,
             new Field(MEMBER_METADATA_KEY_NAME, BYTES));
 
     private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(
             ERROR_CODE,
-            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the 
consumer group."),
+            GENERATION_ID,
             new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol 
selected by the coordinator"),
             new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by 
the group coordinator."),
+            MEMBER_ID,
             new Field(MEMBERS_KEY_NAME, new 
ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
     private static final Schema JOIN_GROUP_RESPONSE_V1 = 
JOIN_GROUP_RESPONSE_V0;
@@ -62,10 +61,10 @@ public class JoinGroupResponse extends AbstractResponse {
     private static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
             THROTTLE_TIME_MS,
             ERROR_CODE,
-            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the 
consumer group."),
+            GENERATION_ID,
             new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol 
selected by the coordinator"),
             new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by 
the group coordinator."),
+            MEMBER_ID,
             new Field(MEMBERS_KEY_NAME, new 
ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
 
 
@@ -128,14 +127,14 @@ public class JoinGroupResponse extends AbstractResponse {
 
         for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
             Struct memberData = (Struct) memberDataObj;
-            String memberId = memberData.getString(MEMBER_ID_KEY_NAME);
+            String memberId = memberData.get(MEMBER_ID);
             ByteBuffer memberMetadata = 
memberData.getBytes(MEMBER_METADATA_KEY_NAME);
             members.put(memberId, memberMetadata);
         }
         error = Errors.forCode(struct.get(ERROR_CODE));
-        generationId = struct.getInt(GENERATION_ID_KEY_NAME);
+        generationId = struct.get(GENERATION_ID);
         groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME);
-        memberId = struct.getString(MEMBER_ID_KEY_NAME);
+        memberId = struct.get(MEMBER_ID);
         leaderId = struct.getString(LEADER_ID_KEY_NAME);
     }
 
@@ -186,15 +185,15 @@ public class JoinGroupResponse extends AbstractResponse {
         struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         struct.set(ERROR_CODE, error.code());
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(GENERATION_ID, generationId);
         struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(MEMBER_ID, memberId);
         struct.set(LEADER_ID_KEY_NAME, leaderId);
 
         List<Struct> memberArray = new ArrayList<>();
         for (Map.Entry<String, ByteBuffer> entries : members.entrySet()) {
             Struct memberData = struct.instance(MEMBERS_KEY_NAME);
-            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+            memberData.set(MEMBER_ID, entries.getKey());
             memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue());
             memberArray.add(memberData);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 661eb7f..b0d0ad6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -18,21 +18,18 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 
 public class LeaveGroupRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
-
     private static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by 
the group coordinator."));
+            GROUP_ID,
+            MEMBER_ID);
 
     /* v1 request is the same as v0. Throttle time has been added to response 
*/
     private static final Schema LEAVE_GROUP_REQUEST_V1 = 
LEAVE_GROUP_REQUEST_V0;
@@ -78,8 +75,8 @@ public class LeaveGroupRequest extends AbstractRequest {
 
     public LeaveGroupRequest(Struct struct, short version) {
         super(version);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        memberId = struct.getString(MEMBER_ID_KEY_NAME);
+        groupId = struct.get(GROUP_ID);
+        memberId = struct.get(MEMBER_ID);
     }
 
     @Override
@@ -111,8 +108,8 @@ public class LeaveGroupRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.LEAVE_GROUP.requestSchema(version()));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(GROUP_ID, groupId);
+        struct.set(MEMBER_ID, memberId);
         return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index afc5ebd..9c82ae0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -29,17 +29,17 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class ListGroupsResponse extends AbstractResponse {
 
     private static final String GROUPS_KEY_NAME = "groups";
-    private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
 
     private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING),
+            GROUP_ID,
             new Field(PROTOCOL_TYPE_KEY_NAME, STRING));
     private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(
             ERROR_CODE,
@@ -80,7 +80,7 @@ public class ListGroupsResponse extends AbstractResponse {
         this.groups = new ArrayList<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
-            String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+            String groupId = groupStruct.get(GROUP_ID);
             String protocolType = 
groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
             this.groups.add(new Group(groupId, protocolType));
         }
@@ -130,7 +130,7 @@ public class ListGroupsResponse extends AbstractResponse {
         List<Struct> groupList = new ArrayList<>();
         for (Group group : groups) {
             Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
-            groupStruct.set(GROUP_ID_KEY_NAME, group.groupId);
+            groupStruct.set(GROUP_ID, group.groupId);
             groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
             groupList.add(groupStruct);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 696d967..4686c3b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -32,20 +32,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 /**
  * This wrapper supports both v0 and v1 of OffsetCommitRequest.
  */
 public class OffsetCommitRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
     private static final String TOPICS_KEY_NAME = "topics";
     private static final String RETENTION_TIME_KEY_NAME = "retention_time";
 
@@ -89,19 +87,19 @@ public class OffsetCommitRequest extends AbstractRequest {
             new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2), "Partitions to commit offsets."));
 
     private static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
+            GROUP_ID,
             new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), "Topics to commit offsets."));
 
     private static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
-            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the 
group."),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by 
the group coordinator."),
+            GROUP_ID,
+            GENERATION_ID,
+            MEMBER_ID,
             new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets."));
 
     private static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The group id."),
-            new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the 
consumer group."),
-            new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by 
the group coordinator."),
+            GROUP_ID,
+            GENERATION_ID,
+            MEMBER_ID,
             new Field(RETENTION_TIME_KEY_NAME, INT64, "Time period in ms to 
retain the offset."),
             new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets."));
 
@@ -229,18 +227,11 @@ public class OffsetCommitRequest extends AbstractRequest {
     public OffsetCommitRequest(Struct struct, short versionId) {
         super(versionId);
 
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        // This field only exists in v1.
-        if (struct.hasField(GENERATION_ID_KEY_NAME))
-            generationId = struct.getInt(GENERATION_ID_KEY_NAME);
-        else
-            generationId = DEFAULT_GENERATION_ID;
+        groupId = struct.get(GROUP_ID);
 
-        // This field only exists in v1.
-        if (struct.hasField(MEMBER_ID_KEY_NAME))
-            memberId = struct.getString(MEMBER_ID_KEY_NAME);
-        else
-            memberId = DEFAULT_MEMBER_ID;
+        // These fields only exists in v1.
+        generationId = struct.getOrElse(GENERATION_ID, DEFAULT_GENERATION_ID);
+        memberId = struct.getOrElse(MEMBER_ID, DEFAULT_MEMBER_ID);
 
         // This field only exists in v2
         if (struct.hasField(RETENTION_TIME_KEY_NAME))
@@ -274,7 +265,7 @@ public class OffsetCommitRequest extends AbstractRequest {
     public Struct toStruct() {
         short version = version();
         Struct struct = new 
Struct(ApiKeys.OFFSET_COMMIT.requestSchema(version));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GROUP_ID, groupId);
 
         Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(offsetData);
         List<Struct> topicArray = new ArrayList<>();
@@ -297,10 +288,8 @@ public class OffsetCommitRequest extends AbstractRequest {
             topicArray.add(topicData);
         }
         struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-        if (struct.hasField(GENERATION_ID_KEY_NAME))
-            struct.set(GENERATION_ID_KEY_NAME, generationId);
-        if (struct.hasField(MEMBER_ID_KEY_NAME))
-            struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.setIfExists(GENERATION_ID, generationId);
+        struct.setIfExists(MEMBER_ID, memberId);
         if (struct.hasField(RETENTION_TIME_KEY_NAME))
             struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
         return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 6d8b959..0db1c50 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -33,12 +33,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class OffsetFetchRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
@@ -63,13 +62,13 @@ public class OffsetFetchRequest extends AbstractRequest {
             new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0), "Partitions to fetch offsets."));
 
     private static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The consumer group id."),
+            GROUP_ID,
             new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets."));
 
     private static final Schema OFFSET_FETCH_REQUEST_V1 = 
OFFSET_FETCH_REQUEST_V0;
 
     private static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING, "The consumer group id."),
+            GROUP_ID,
             new Field(TOPICS_KEY_NAME, 
ArrayOf.nullable(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets. If 
the " +
                     "topic array is null fetch offsets for all topics."));
 
@@ -153,7 +152,7 @@ public class OffsetFetchRequest extends AbstractRequest {
             partitions = null;
 
 
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        groupId = struct.get(GROUP_ID);
     }
 
     public OffsetFetchResponse getErrorResponse(Errors error) {
@@ -210,7 +209,7 @@ public class OffsetFetchRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.OFFSET_FETCH.requestSchema(version()));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GROUP_ID, groupId);
         if (partitions != null) {
             Map<String, List<Integer>> topicsData = 
CollectionUtils.groupDataByTopic(partitions);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 4ff9fcd..14ed262 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -30,24 +30,22 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
+import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class SyncGroupRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String GENERATION_ID_KEY_NAME = "generation_id";
-    private static final String MEMBER_ID_KEY_NAME = "member_id";
     private static final String MEMBER_ASSIGNMENT_KEY_NAME = 
"member_assignment";
     private static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
 
     private static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(
-            new Field(MEMBER_ID_KEY_NAME, STRING),
+            MEMBER_ID,
             new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
     private static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(
-            new Field(GROUP_ID_KEY_NAME, STRING),
-            new Field(GENERATION_ID_KEY_NAME, INT32),
-            new Field(MEMBER_ID_KEY_NAME, STRING),
+            GROUP_ID,
+            GENERATION_ID,
+            MEMBER_ID,
             new Field(GROUP_ASSIGNMENT_KEY_NAME, new 
ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
 
     /* v1 request is the same as v0. Throttle time has been added to response 
*/
@@ -106,15 +104,15 @@ public class SyncGroupRequest extends AbstractRequest {
 
     public SyncGroupRequest(Struct struct, short version) {
         super(version);
-        this.groupId = struct.getString(GROUP_ID_KEY_NAME);
-        this.generationId = struct.getInt(GENERATION_ID_KEY_NAME);
-        this.memberId = struct.getString(MEMBER_ID_KEY_NAME);
+        this.groupId = struct.get(GROUP_ID);
+        this.generationId = struct.get(GENERATION_ID);
+        this.memberId = struct.get(MEMBER_ID);
 
         groupAssignment = new HashMap<>();
 
         for (Object memberDataObj : 
struct.getArray(GROUP_ASSIGNMENT_KEY_NAME)) {
             Struct memberData = (Struct) memberDataObj;
-            String memberId = memberData.getString(MEMBER_ID_KEY_NAME);
+            String memberId = memberData.get(MEMBER_ID);
             ByteBuffer memberMetadata = 
memberData.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
             groupAssignment.put(memberId, memberMetadata);
         }
@@ -162,14 +160,14 @@ public class SyncGroupRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.SYNC_GROUP.requestSchema(version()));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(GROUP_ID, groupId);
+        struct.set(GENERATION_ID, generationId);
+        struct.set(MEMBER_ID, memberId);
 
         List<Struct> memberArray = new ArrayList<>();
         for (Map.Entry<String, ByteBuffer> entries: 
groupAssignment.entrySet()) {
             Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME);
-            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+            memberData.set(MEMBER_ID, entries.getKey());
             memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue());
             memberArray.add(memberData);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index d384192..9787c2d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -29,18 +29,16 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
+import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class TxnOffsetCommitRequest extends AbstractRequest {
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String CONSUMER_GROUP_ID_KEY_NAME = 
"consumer_group_id";
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TOPICS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
     private static final String OFFSET_KEY_NAME = "offset";
@@ -52,10 +50,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest 
{
             new Field(METADATA_KEY_NAME, NULLABLE_STRING));
 
     private static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
-            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id 
corresponding to the transaction."),
-            new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Id of the 
associated consumer group to commit offsets for."),
-            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use 
by the transactional id."),
-            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch 
associated with the producer id."),
+            TRANSACTIONAL_ID,
+            GROUP_ID,
+            PRODUCER_ID,
+            PRODUCER_EPOCH,
             new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
                     TOPIC_NAME,
                     new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0)))),
@@ -127,10 +125,10 @@ public class TxnOffsetCommitRequest extends 
AbstractRequest {
 
     public TxnOffsetCommitRequest(Struct struct, short version) {
         super(version);
-        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
-        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
-        this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
+        this.transactionalId = struct.get(TRANSACTIONAL_ID);
+        this.consumerGroupId = struct.get(GROUP_ID);
+        this.producerId = struct.get(PRODUCER_ID);
+        this.producerEpoch = struct.get(PRODUCER_EPOCH);
 
         Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
         Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
@@ -171,10 +169,10 @@ public class TxnOffsetCommitRequest extends 
AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
-        struct.set(PRODUCER_ID_KEY_NAME, producerId);
-        struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
+        struct.set(TRANSACTIONAL_ID, transactionalId);
+        struct.set(GROUP_ID, consumerGroupId);
+        struct.set(PRODUCER_ID, producerId);
+        struct.set(PRODUCER_EPOCH, producerEpoch);
 
         Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = 
CollectionUtils.groupDataByTopic(offsets);
         Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];

Reply via email to