http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 64bd3d3..dee6a5f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
+
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.Records;
@@ -39,7 +39,6 @@ import java.util.Map;
  */
 public class FetchResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
@@ -71,7 +70,7 @@ public class FetchResponse extends AbstractResponse {
     public static final long INVALID_HIGHWATERMARK = -1L;
 
     private final LinkedHashMap<TopicPartition, PartitionData> responseData;
-    private final int throttleTime;
+    private final int throttleTimeMs;
 
     public static final class PartitionData {
         public final Errors error;
@@ -92,35 +91,20 @@ public class FetchResponse extends AbstractResponse {
     }
 
     /**
-     * Constructor for version 3.
-     *
-     * The entries in `responseData` should be in the same order as the 
entries in `FetchRequest.fetchData`.
-     *
-     * @param responseData fetched data grouped by topic-partition
-     * @param throttleTime Time in milliseconds the response was throttled
-     */
-    public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> 
responseData, int throttleTime) {
-        this(3, responseData, throttleTime);
-    }
-
-    /**
      * Constructor for all versions.
      *
      * From version 3, the entries in `responseData` should be in the same 
order as the entries in
      * `FetchRequest.fetchData`.
      *
      * @param responseData fetched data grouped by topic-partition
-     * @param throttleTime Time in milliseconds the response was throttled
+     * @param throttleTimeMs Time in milliseconds the response was throttled
      */
-    public FetchResponse(int version, LinkedHashMap<TopicPartition, 
PartitionData> responseData, int throttleTime) {
-        super(writeStruct(new 
Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)), version, 
responseData,
-                throttleTime));
+    public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> 
responseData, int throttleTimeMs) {
         this.responseData = responseData;
-        this.throttleTime = throttleTime;
+        this.throttleTimeMs = throttleTimeMs;
     }
 
     public FetchResponse(Struct struct) {
-        super(struct);
         LinkedHashMap<TopicPartition, PartitionData> responseData = new 
LinkedHashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
@@ -137,22 +121,31 @@ public class FetchResponse extends AbstractResponse {
             }
         }
         this.responseData = responseData;
-        this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+    }
+
+    @Override
+    public Struct toStruct(short version) {
+        return toStruct(version, responseData, throttleTimeMs);
     }
 
     @Override
     public Send toSend(String dest, RequestHeader requestHeader) {
-        ResponseHeader responseHeader = new 
ResponseHeader(requestHeader.correlationId());
+        return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, 
dest, requestHeader);
+    }
+
+    public Send toSend(Struct responseStruct, int throttleTimeMs, String dest, 
RequestHeader requestHeader) {
+        Struct responseHeader = new 
ResponseHeader(requestHeader.correlationId()).toStruct();
 
         // write the total size and the response header
         ByteBuffer buffer = ByteBuffer.allocate(responseHeader.sizeOf() + 4);
-        buffer.putInt(responseHeader.sizeOf() + struct.sizeOf());
+        buffer.putInt(responseHeader.sizeOf() + responseStruct.sizeOf());
         responseHeader.writeTo(buffer);
         buffer.rewind();
 
         List<Send> sends = new ArrayList<>();
         sends.add(new ByteBufferSend(dest, buffer));
-        addResponseData(dest, sends);
+        addResponseData(responseStruct, throttleTimeMs, dest, sends);
         return new MultiSend(dest, sends);
     }
 
@@ -160,25 +153,20 @@ public class FetchResponse extends AbstractResponse {
         return responseData;
     }
 
-    public int getThrottleTime() {
-        return this.throttleTime;
+    public int throttleTimeMs() {
+        return this.throttleTimeMs;
     }
 
-    public static FetchResponse parse(ByteBuffer buffer) {
-        return new FetchResponse(CURRENT_SCHEMA.read(buffer));
-    }
-
-    public static FetchResponse parse(ByteBuffer buffer, int version) {
+    public static FetchResponse parse(ByteBuffer buffer, short version) {
         return new FetchResponse(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 
version).read(buffer));
     }
 
-    private void addResponseData(String dest, List<Send> sends) {
+    private static void addResponseData(Struct struct, int throttleTimeMs, 
String dest, List<Send> sends) {
         Object[] allTopicData = struct.getArray(RESPONSES_KEY_NAME);
 
         if (struct.hasField(THROTTLE_TIME_KEY_NAME)) {
-            int throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
             ByteBuffer buffer = ByteBuffer.allocate(8);
-            buffer.putInt(throttleTime);
+            buffer.putInt(throttleTimeMs);
             buffer.putInt(allTopicData.length);
             buffer.rewind();
             sends.add(new ByteBufferSend(dest, buffer));
@@ -193,7 +181,7 @@ public class FetchResponse extends AbstractResponse {
             addTopicData(dest, sends, (Struct) topicData);
     }
 
-    private void addTopicData(String dest, List<Send> sends, Struct topicData) 
{
+    private static void addTopicData(String dest, List<Send> sends, Struct 
topicData) {
         String topic = topicData.getString(TOPIC_KEY_NAME);
         Object[] allPartitionData = topicData.getArray(PARTITIONS_KEY_NAME);
 
@@ -208,7 +196,7 @@ public class FetchResponse extends AbstractResponse {
             addPartitionData(dest, sends, (Struct) partitionData);
     }
 
-    private void addPartitionData(String dest, List<Send> sends, Struct 
partitionData) {
+    private static void addPartitionData(String dest, List<Send> sends, Struct 
partitionData) {
         Struct header = partitionData.getStruct(PARTITION_HEADER_KEY_NAME);
         Records records = partitionData.getRecords(RECORD_SET_KEY_NAME);
 
@@ -223,10 +211,8 @@ public class FetchResponse extends AbstractResponse {
         sends.add(new RecordsSend(dest, records));
     }
 
-    private static Struct writeStruct(Struct struct,
-                                      int version,
-                                      LinkedHashMap<TopicPartition, 
PartitionData> responseData,
-                                      int throttleTime) {
+    private static Struct toStruct(short version, 
LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
+        Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 
version));
         List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = 
FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();
         for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: 
topicsData) {
@@ -255,10 +241,8 @@ public class FetchResponse extends AbstractResponse {
         return struct;
     }
 
-    public static int sizeOf(int version, LinkedHashMap<TopicPartition, 
PartitionData> responseData) {
-        Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 
version));
-        writeStruct(struct, version, responseData, 0);
-        return 4 + struct.sizeOf();
+    public static int sizeOf(short version, LinkedHashMap<TopicPartition, 
PartitionData> responseData) {
+        return 4 + toStruct(version, responseData, 0).sizeOf();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
index ed56f39..83d6cba 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -32,8 +32,7 @@ public class GroupCoordinatorRequest extends AbstractRequest {
         }
 
         @Override
-        public GroupCoordinatorRequest build() {
-            short version = version();
+        public GroupCoordinatorRequest build(short version) {
             return new GroupCoordinatorRequest(this.groupId, version);
         }
 
@@ -49,14 +48,12 @@ public class GroupCoordinatorRequest extends 
AbstractRequest {
     private final String groupId;
 
     private GroupCoordinatorRequest(String groupId, short version) {
-        super(new 
Struct(ProtoUtils.requestSchema(ApiKeys.GROUP_COORDINATOR.id, version)),
-                version);
-        struct.set(GROUP_ID_KEY_NAME, groupId);
+        super(version);
         this.groupId = groupId;
     }
 
     public GroupCoordinatorRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+        super(versionId);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
     }
 
@@ -76,12 +73,15 @@ public class GroupCoordinatorRequest extends 
AbstractRequest {
         return groupId;
     }
 
-    public static GroupCoordinatorRequest parse(ByteBuffer buffer, int 
versionId) {
+    public static GroupCoordinatorRequest parse(ByteBuffer buffer, short 
versionId) {
         return new 
GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, 
versionId, buffer),
-                (short) versionId);
+                versionId);
     }
 
-    public static GroupCoordinatorRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.GROUP_COORDINATOR.id, version()));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
index fc3d358..c13cf3c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -16,14 +16,12 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 public class GroupCoordinatorResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String COORDINATOR_KEY_NAME = "coordinator";
 
@@ -45,19 +43,11 @@ public class GroupCoordinatorResponse extends 
AbstractResponse {
     private final Node node;
 
     public GroupCoordinatorResponse(Errors error, Node node) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
         this.error = error;
         this.node = node;
     }
 
     public GroupCoordinatorResponse(Struct struct) {
-        super(struct);
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
         int nodeId = broker.getInt(NODE_ID_KEY_NAME);
@@ -74,7 +64,19 @@ public class GroupCoordinatorResponse extends 
AbstractResponse {
         return node;
     }
 
-    public static GroupCoordinatorResponse parse(ByteBuffer buffer) {
-        return new GroupCoordinatorResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.GROUP_COORDINATOR.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        return struct;
+    }
+
+    public static GroupCoordinatorResponse parse(ByteBuffer buffer, short 
version) {
+        return new 
GroupCoordinatorResponse(ProtoUtils.parseResponse(ApiKeys.GROUP_COORDINATOR.id, 
version, buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 7e79c8a..4440830 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -37,8 +37,8 @@ public class HeartbeatRequest extends AbstractRequest {
         }
 
         @Override
-        public HeartbeatRequest build() {
-            return new HeartbeatRequest(groupId, groupGenerationId, memberId, 
version());
+        public HeartbeatRequest build(short version) {
+            return new HeartbeatRequest(groupId, groupGenerationId, memberId, 
version);
         }
 
         @Override
@@ -58,18 +58,14 @@ public class HeartbeatRequest extends AbstractRequest {
     private final String memberId;
 
     private HeartbeatRequest(String groupId, int groupGenerationId, String 
memberId, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.HEARTBEAT.id, 
version)),
-                version);
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        super(version);
         this.groupId = groupId;
         this.groupGenerationId = groupGenerationId;
         this.memberId = memberId;
     }
 
-    public HeartbeatRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public HeartbeatRequest(Struct struct, short version) {
+        super(version);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
         memberId = struct.getString(MEMBER_ID_KEY_NAME);
@@ -99,11 +95,16 @@ public class HeartbeatRequest extends AbstractRequest {
         return memberId;
     }
 
-    public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, 
buffer), (short) versionId);
+    public static HeartbeatRequest parse(ByteBuffer buffer, short versionId) {
+        return new 
HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, 
buffer), versionId);
     }
 
-    public static HeartbeatRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.HEARTBEAT.id, version()));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index f36dec4..4cca846 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -15,14 +15,12 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 public class HeartbeatResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -35,17 +33,13 @@ public class HeartbeatResponse extends AbstractResponse {
      * REBALANCE_IN_PROGRESS (27)
      * GROUP_AUTHORIZATION_FAILED (30)
      */
-
     private final Errors error;
 
     public HeartbeatResponse(Errors error) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
         this.error = error;
     }
 
     public HeartbeatResponse(Struct struct) {
-        super(struct);
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
@@ -53,7 +47,14 @@ public class HeartbeatResponse extends AbstractResponse {
         return error;
     }
 
-    public static HeartbeatResponse parse(ByteBuffer buffer) {
-        return new HeartbeatResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.HEARTBEAT.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static HeartbeatResponse parse(ByteBuffer buffer, short version) {
+        return new 
HeartbeatResponse(ProtoUtils.parseResponse(ApiKeys.HEARTBEAT.id, version, 
buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index ad0cdd0..37906a6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -85,8 +85,7 @@ public class JoinGroupRequest extends AbstractRequest {
         }
 
         @Override
-        public JoinGroupRequest build() {
-            short version = version();
+        public JoinGroupRequest build(short version) {
             if (version < 1) {
                 rebalanceTimeout = -1;
             }
@@ -112,23 +111,7 @@ public class JoinGroupRequest extends AbstractRequest {
     private JoinGroupRequest(short version, String groupId, int sessionTimeout,
             int rebalanceTimeout, String memberId, String protocolType,
             List<ProtocolMetadata> groupProtocols) {
-        super(new Struct(ProtoUtils.
-                requestSchema(ApiKeys.JOIN_GROUP.id, version)), version);
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
-        if (version >= 1) {
-            struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout);
-        }
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
-        struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
-        List<Struct> groupProtocolsList = new 
ArrayList<>(groupProtocols.size());
-        for (ProtocolMetadata protocol : groupProtocols) {
-            Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
-            protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
-            protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
-            groupProtocolsList.add(protocolStruct);
-        }
-        struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
+        super(version);
         this.groupId = groupId;
         this.sessionTimeout = sessionTimeout;
         this.rebalanceTimeout = rebalanceTimeout;
@@ -138,7 +121,7 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     public JoinGroupRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+        super(versionId);
 
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
@@ -169,7 +152,6 @@ public class JoinGroupRequest extends AbstractRequest {
             case 0:
             case 1:
                 return new JoinGroupResponse(
-                        versionId,
                         Errors.forException(e),
                         JoinGroupResponse.UNKNOWN_GENERATION_ID,
                         JoinGroupResponse.UNKNOWN_PROTOCOL,
@@ -207,12 +189,29 @@ public class JoinGroupRequest extends AbstractRequest {
         return protocolType;
     }
 
-    public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, 
buffer),
-                (short) versionId);
+    public static JoinGroupRequest parse(ByteBuffer buffer, short version) {
+        return new 
JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, version, 
buffer), version);
     }
 
-    public static JoinGroupRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id));
+    @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.JOIN_GROUP.id, version));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
+        if (version >= 1) {
+            struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout);
+        }
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
+        List<Struct> groupProtocolsList = new 
ArrayList<>(groupProtocols.size());
+        for (ProtocolMetadata protocol : groupProtocols) {
+            Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
+            protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
+            protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
+            groupProtocolsList.add(protocolStruct);
+        }
+        struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index bc9366a..d2a323b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -15,7 +15,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -26,8 +25,6 @@ import java.util.Map;
 
 public class JoinGroupResponse extends AbstractResponse {
 
-    private static final short CURRENT_VERSION = 
ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id);
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -67,33 +64,6 @@ public class JoinGroupResponse extends AbstractResponse {
                              String memberId,
                              String leaderId,
                              Map<String, ByteBuffer> groupMembers) {
-        this(CURRENT_VERSION, error, generationId, groupProtocol, memberId, 
leaderId, groupMembers);
-    }
-
-    public JoinGroupResponse(int version,
-                             Errors error,
-                             int generationId,
-                             String groupProtocol,
-                             String memberId,
-                             String leaderId,
-                             Map<String, ByteBuffer> groupMembers) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, 
version)));
-
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
-        struct.set(LEADER_ID_KEY_NAME, leaderId);
-
-        List<Struct> memberArray = new ArrayList<>();
-        for (Map.Entry<String, ByteBuffer> entries: groupMembers.entrySet()) {
-            Struct memberData = struct.instance(MEMBERS_KEY_NAME);
-            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
-            memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue());
-            memberArray.add(memberData);
-        }
-        struct.set(MEMBERS_KEY_NAME, memberArray.toArray());
-
         this.error = error;
         this.generationId = generationId;
         this.groupProtocol = groupProtocol;
@@ -103,7 +73,6 @@ public class JoinGroupResponse extends AbstractResponse {
     }
 
     public JoinGroupResponse(Struct struct) {
-        super(struct);
         members = new HashMap<>();
 
         for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
@@ -147,11 +116,29 @@ public class JoinGroupResponse extends AbstractResponse {
         return members;
     }
 
-    public static JoinGroupResponse parse(ByteBuffer buffer, int version) {
-        return new 
JoinGroupResponse(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, 
version).read(buffer));
+    public static JoinGroupResponse parse(ByteBuffer buffer, short version) {
+        return new 
JoinGroupResponse(ProtoUtils.parseResponse(ApiKeys.JOIN_GROUP.id, version, 
buffer));
     }
 
-    public static JoinGroupResponse parse(ByteBuffer buffer) {
-        return new JoinGroupResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version));
+
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        struct.set(LEADER_ID_KEY_NAME, leaderId);
+
+        List<Struct> memberArray = new ArrayList<>();
+        for (Map.Entry<String, ByteBuffer> entries : members.entrySet()) {
+            Struct memberData = struct.instance(MEMBERS_KEY_NAME);
+            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+            memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue());
+            memberArray.add(memberData);
+        }
+        struct.set(MEMBERS_KEY_NAME, memberArray.toArray());
+
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index fde184a..c564b43 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -69,9 +69,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         }
 
         @Override
-        public LeaderAndIsrRequest build() {
-            return new LeaderAndIsrRequest(controllerId, controllerEpoch, 
partitionStates,
-                    liveLeaders, version());
+        public LeaderAndIsrRequest build(short version) {
+            return new LeaderAndIsrRequest(controllerId, controllerEpoch, 
partitionStates, liveLeaders, version);
         }
 
         @Override
@@ -94,46 +93,15 @@ public class LeaderAndIsrRequest extends AbstractRequest {
 
     private LeaderAndIsrRequest(int controllerId, int controllerEpoch, 
Map<TopicPartition, PartitionState> partitionStates,
                                Set<Node> liveLeaders, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LEADER_AND_ISR.id, 
version)),
-                version);
-        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
-        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
-
-        List<Struct> partitionStatesData = new 
ArrayList<>(partitionStates.size());
-        for (Map.Entry<TopicPartition, PartitionState> entry : 
partitionStates.entrySet()) {
-            Struct partitionStateData = 
struct.instance(PARTITION_STATES_KEY_NAME);
-            TopicPartition topicPartition = entry.getKey();
-            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
-            partitionStateData.set(PARTITION_KEY_NAME, 
topicPartition.partition());
-            PartitionState partitionState = entry.getValue();
-            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, 
partitionState.controllerEpoch);
-            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
-            partitionStateData.set(LEADER_EPOCH_KEY_NAME, 
partitionState.leaderEpoch);
-            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
-            partitionStateData.set(ZK_VERSION_KEY_NAME, 
partitionState.zkVersion);
-            partitionStateData.set(REPLICAS_KEY_NAME, 
partitionState.replicas.toArray());
-            partitionStatesData.add(partitionStateData);
-        }
-        struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
-
-        List<Struct> leadersData = new ArrayList<>(liveLeaders.size());
-        for (Node leader : liveLeaders) {
-            Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME);
-            leaderData.set(END_POINT_ID_KEY_NAME, leader.id());
-            leaderData.set(HOST_KEY_NAME, leader.host());
-            leaderData.set(PORT_KEY_NAME, leader.port());
-            leadersData.add(leaderData);
-        }
-        struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray());
-
+        super(version);
         this.controllerId = controllerId;
         this.controllerEpoch = controllerEpoch;
         this.partitionStates = partitionStates;
         this.liveLeaders = liveLeaders;
     }
 
-    public LeaderAndIsrRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public LeaderAndIsrRequest(Struct struct, short version) {
+        super(version);
 
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         for (Object partitionStateDataObj : 
struct.getArray(PARTITION_STATES_KEY_NAME)) {
@@ -177,6 +145,42 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     }
 
     @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.LEADER_AND_ISR.id, version));
+        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+
+        List<Struct> partitionStatesData = new 
ArrayList<>(partitionStates.size());
+        for (Map.Entry<TopicPartition, PartitionState> entry : 
partitionStates.entrySet()) {
+            Struct partitionStateData = 
struct.instance(PARTITION_STATES_KEY_NAME);
+            TopicPartition topicPartition = entry.getKey();
+            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
+            partitionStateData.set(PARTITION_KEY_NAME, 
topicPartition.partition());
+            PartitionState partitionState = entry.getValue();
+            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, 
partitionState.controllerEpoch);
+            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
+            partitionStateData.set(LEADER_EPOCH_KEY_NAME, 
partitionState.leaderEpoch);
+            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
+            partitionStateData.set(ZK_VERSION_KEY_NAME, 
partitionState.zkVersion);
+            partitionStateData.set(REPLICAS_KEY_NAME, 
partitionState.replicas.toArray());
+            partitionStatesData.add(partitionStateData);
+        }
+        struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
+
+        List<Struct> leadersData = new ArrayList<>(liveLeaders.size());
+        for (Node leader : liveLeaders) {
+            Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME);
+            leaderData.set(END_POINT_ID_KEY_NAME, leader.id());
+            leaderData.set(HOST_KEY_NAME, leader.host());
+            leaderData.set(PORT_KEY_NAME, leader.port());
+            leadersData.add(leaderData);
+        }
+        struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray());
+        return struct;
+    }
+
+    @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         Map<TopicPartition, Errors> responses = new 
HashMap<>(partitionStates.size());
         for (TopicPartition partition : partitionStates.keySet()) {
@@ -209,12 +213,8 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         return liveLeaders;
     }
 
-    public static LeaderAndIsrRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, 
versionId, buffer),
-                (short) versionId);
+    public static LeaderAndIsrRequest parse(ByteBuffer buffer, short 
versionId) {
+        return new 
LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, 
versionId, buffer), versionId);
     }
 
-    public static LeaderAndIsrRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id));
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index 4d0a05d..2b02daf 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -17,7 +17,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 
 public class LeaderAndIsrResponse extends AbstractResponse {
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.LEADER_AND_ISR.id);
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String PARTITIONS_KEY_NAME = "partitions";
@@ -45,33 +43,12 @@ public class LeaderAndIsrResponse extends AbstractResponse {
 
     private final Map<TopicPartition, Errors> responses;
 
-    public LeaderAndIsrResponse(Map<TopicPartition, Errors> responses) {
-        this(Errors.NONE, responses);
-    }
-
     public LeaderAndIsrResponse(Errors error, Map<TopicPartition, Errors> 
responses) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        List<Struct> responseDatas = new ArrayList<>(responses.size());
-        for (Map.Entry<TopicPartition, Errors> response : 
responses.entrySet()) {
-            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
-            TopicPartition partition = response.getKey();
-            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
-            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, 
partition.partition());
-            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue().code());
-            responseDatas.add(partitionData);
-        }
-
-        struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-
         this.responses = responses;
         this.error = error;
     }
 
     public LeaderAndIsrResponse(Struct struct) {
-        super(struct);
-
         responses = new HashMap<>();
         for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
             Struct responseData = (Struct) responseDataObj;
@@ -92,12 +69,27 @@ public class LeaderAndIsrResponse extends AbstractResponse {
         return error;
     }
 
-    public static LeaderAndIsrResponse parse(ByteBuffer buffer, int version) {
+    public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) 
{
         return new 
LeaderAndIsrResponse(ProtoUtils.parseResponse(ApiKeys.LEADER_AND_ISR.id, 
version, buffer));
     }
 
-    public static LeaderAndIsrResponse parse(ByteBuffer buffer) {
-        return new LeaderAndIsrResponse(CURRENT_SCHEMA.read(buffer));
-    }
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.LEADER_AND_ISR.id, version));
+
+        List<Struct> responseDatas = new ArrayList<>(responses.size());
+        for (Map.Entry<TopicPartition, Errors> response : 
responses.entrySet()) {
+            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+            TopicPartition partition = response.getKey();
+            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
+            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, 
partition.partition());
+            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue().code());
+            responseDatas.add(partitionData);
+        }
 
+        struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+
+        return struct;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 2a7b70e..16622e4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -33,8 +33,8 @@ public class LeaveGroupRequest extends AbstractRequest {
         }
 
         @Override
-        public LeaveGroupRequest build() {
-            return new LeaveGroupRequest(groupId, memberId, version());
+        public LeaveGroupRequest build(short version) {
+            return new LeaveGroupRequest(groupId, memberId, version);
         }
 
         @Override
@@ -52,16 +52,13 @@ public class LeaveGroupRequest extends AbstractRequest {
     private final String memberId;
 
     private LeaveGroupRequest(String groupId, String memberId, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LEAVE_GROUP.id, 
version)),
-                version);
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        super(version);
         this.groupId = groupId;
         this.memberId = memberId;
     }
 
     public LeaveGroupRequest(Struct struct, short version) {
-        super(struct, version);
+        super(version);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         memberId = struct.getString(MEMBER_ID_KEY_NAME);
     }
@@ -86,12 +83,15 @@ public class LeaveGroupRequest extends AbstractRequest {
         return memberId;
     }
 
-    public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, 
buffer),
-                (short) versionId);
+    public static LeaveGroupRequest parse(ByteBuffer buffer, short versionId) {
+        return new 
LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, 
buffer), versionId);
     }
 
-    public static LeaveGroupRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LEAVE_GROUP.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.LEAVE_GROUP.id, version()));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index bd1c84d..a28816a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -12,16 +12,15 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+
 public class LeaveGroupResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.LEAVE_GROUP.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -33,16 +32,13 @@ public class LeaveGroupResponse extends AbstractResponse {
      * UNKNOWN_CONSUMER_ID (25)
      * GROUP_AUTHORIZATION_FAILED (30)
      */
-
     private final Errors error;
+
     public LeaveGroupResponse(Errors error) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
         this.error = error;
     }
 
     public LeaveGroupResponse(Struct struct) {
-        super(struct);
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
@@ -50,7 +46,15 @@ public class LeaveGroupResponse extends AbstractResponse {
         return error;
     }
 
-    public static LeaveGroupResponse parse(ByteBuffer buffer) {
-        return new LeaveGroupResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    public Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.LEAVE_GROUP.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
     }
+
+    public static LeaveGroupResponse parse(ByteBuffer buffer, short versionId) 
{
+        return new 
LeaveGroupResponse(ProtoUtils.parseResponse(ApiKeys.LEAVE_GROUP.id, versionId, 
buffer));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 235f4e4..badb527 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -27,8 +27,8 @@ public class ListGroupsRequest extends AbstractRequest {
         }
 
         @Override
-        public ListGroupsRequest build() {
-            return new ListGroupsRequest(version());
+        public ListGroupsRequest build(short version) {
+            return new ListGroupsRequest(version);
         }
 
         @Override
@@ -38,12 +38,11 @@ public class ListGroupsRequest extends AbstractRequest {
     }
 
     public ListGroupsRequest(short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_GROUPS.id, 
version)),
-                version);
+        super(version);
     }
 
     public ListGroupsRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+        super(versionId);
     }
 
     @Override
@@ -58,12 +57,12 @@ public class ListGroupsRequest extends AbstractRequest {
         }
     }
 
-    public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, 
buffer),
-                (short) versionId);
+    public static ListGroupsRequest parse(ByteBuffer buffer, short versionId) {
+        return new 
ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, 
buffer), versionId);
     }
 
-    public static ListGroupsRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id));
+    @Override
+    protected Struct toStruct() {
+        return new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_GROUPS.id, 
version()));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index f421064..e05a4b1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -15,7 +15,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -25,8 +24,6 @@ import java.util.List;
 
 public class ListGroupsResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id);
-
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String GROUPS_KEY_NAME = "groups";
     public static final String GROUP_ID_KEY_NAME = "group_id";
@@ -43,22 +40,11 @@ public class ListGroupsResponse extends AbstractResponse {
     private final List<Group> groups;
 
     public ListGroupsResponse(Errors error, List<Group> groups) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        List<Struct> groupList = new ArrayList<>();
-        for (Group group : groups) {
-            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
-            groupStruct.set(GROUP_ID_KEY_NAME, group.groupId);
-            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
-            groupList.add(groupStruct);
-        }
-        struct.set(GROUPS_KEY_NAME, groupList.toArray());
         this.error = error;
         this.groups = groups;
     }
 
     public ListGroupsResponse(Struct struct) {
-        super(struct);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.groups = new ArrayList<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
@@ -96,12 +82,27 @@ public class ListGroupsResponse extends AbstractResponse {
 
     }
 
-    public static ListGroupsResponse parse(ByteBuffer buffer) {
-        return new ListGroupsResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.LIST_GROUPS.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        List<Struct> groupList = new ArrayList<>();
+        for (Group group : groups) {
+            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+            groupStruct.set(GROUP_ID_KEY_NAME, group.groupId);
+            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+            groupList.add(groupStruct);
+        }
+        struct.set(GROUPS_KEY_NAME, groupList.toArray());
+        return struct;
     }
 
     public static ListGroupsResponse fromError(Errors error) {
         return new ListGroupsResponse(error, Collections.<Group>emptyList());
     }
 
+    public static ListGroupsResponse parse(ByteBuffer buffer, short versionId) 
{
+        return new 
ListGroupsResponse(ProtoUtils.parseResponse(ApiKeys.LIST_GROUPS.id, versionId, 
buffer));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 6214a56..3e2ad7c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -60,16 +60,21 @@ public class ListOffsetRequest extends AbstractRequest {
 
     public static class Builder extends 
AbstractRequest.Builder<ListOffsetRequest> {
         private final int replicaId;
+        private final short minVersion;
         private Map<TopicPartition, PartitionData> offsetData = null;
         private Map<TopicPartition, Long> partitionTimestamps = null;
-        private short minVersion = (short) 0;
 
-        public Builder() {
-            this(CONSUMER_REPLICA_ID);
+        public static Builder forReplica(short desiredVersion, int replicaId) {
+            return new Builder((short) 0, desiredVersion, replicaId);
         }
 
-        public Builder(int replicaId) {
-            super(ApiKeys.LIST_OFFSETS);
+        public static Builder forConsumer(short minVersion) {
+            return new Builder(minVersion, null, CONSUMER_REPLICA_ID);
+        }
+
+        private Builder(short minVersion, Short desiredVersion, int replicaId) 
{
+            super(ApiKeys.LIST_OFFSETS, desiredVersion);
+            this.minVersion = minVersion;
             this.replicaId = replicaId;
         }
 
@@ -84,8 +89,7 @@ public class ListOffsetRequest extends AbstractRequest {
         }
 
         @Override
-        public ListOffsetRequest build() {
-            short version = version();
+        public ListOffsetRequest build(short version) {
             if (version < minVersion) {
                 throw new UnsupportedVersionException("Cannot create a v" + 
version + " ListOffsetRequest because " +
                     "we require features supported only in " + minVersion + " 
or later.");
@@ -117,14 +121,6 @@ public class ListOffsetRequest extends AbstractRequest {
             return new ListOffsetRequest(replicaId, m, version);
         }
 
-        /**
-         * Set the minimum version which we will produce for this request.
-         */
-        public Builder setMinVersion(short minVersion) {
-            this.minVersion = minVersion;
-            return this;
-        }
-
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
@@ -170,44 +166,15 @@ public class ListOffsetRequest extends AbstractRequest {
      */
     @SuppressWarnings("unchecked")
     private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> 
targetTimes, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, 
version)), version);
-        Map<String, Map<Integer, Object>> topicsData =
-                CollectionUtils.groupDataByTopic((Map<TopicPartition, Object>) 
targetTimes);
-
-        struct.set(REPLICA_ID_KEY_NAME, replicaId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, Object>> topicEntry: 
topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, Object> partitionEntry : 
topicEntry.getValue().entrySet()) {
-                if (version == 0) {
-                    PartitionData offsetPartitionData = (PartitionData) 
partitionEntry.getValue();
-                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
-                    partitionData.set(PARTITION_KEY_NAME, 
partitionEntry.getKey());
-                    partitionData.set(TIMESTAMP_KEY_NAME, 
offsetPartitionData.timestamp);
-                    partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, 
offsetPartitionData.maxNumOffsets);
-                    partitionArray.add(partitionData);
-                } else {
-                    Long timestamp = (Long) partitionEntry.getValue();
-                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
-                    partitionData.set(PARTITION_KEY_NAME, 
partitionEntry.getKey());
-                    partitionData.set(TIMESTAMP_KEY_NAME, timestamp);
-                    partitionArray.add(partitionData);
-                }
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        super(version);
         this.replicaId = replicaId;
         this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) 
targetTimes : null;
         this.partitionTimestamps = version == 1 ? (Map<TopicPartition, Long>) 
targetTimes : null;
         this.duplicatePartitions = Collections.emptySet();
     }
 
-    public ListOffsetRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public ListOffsetRequest(Struct struct, short version) {
+        super(version);
         Set<TopicPartition> duplicatePartitions = new HashSet<>();
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
         offsetData = new HashMap<>();
@@ -236,17 +203,19 @@ public class ListOffsetRequest extends AbstractRequest {
     @Override
     @SuppressWarnings("deprecation")
     public AbstractResponse getErrorResponse(Throwable e) {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<>();
 
         short versionId = version();
         if (versionId == 0) {
             for (Map.Entry<TopicPartition, PartitionData> entry : 
offsetData.entrySet()) {
-                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e), new ArrayList<Long>());
+                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(
+                        Errors.forException(e), Collections.<Long>emptyList());
                 responseData.put(entry.getKey(), partitionResponse);
             }
         } else {
             for (Map.Entry<TopicPartition, Long> entry : 
partitionTimestamps.entrySet()) {
-                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L);
+                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(
+                        Errors.forException(e), -1L, -1L);
                 responseData.put(entry.getKey(), partitionResponse);
             }
         }
@@ -254,7 +223,7 @@ public class ListOffsetRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
             case 1:
-                return new ListOffsetResponse(responseData, versionId);
+                return new ListOffsetResponse(responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
@@ -278,12 +247,44 @@ public class ListOffsetRequest extends AbstractRequest {
         return duplicatePartitions;
     }
 
-    public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, 
buffer),
-                (short) versionId);
+    public static ListOffsetRequest parse(ByteBuffer buffer, short versionId) {
+        return new 
ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, 
buffer), versionId);
     }
 
-    public static ListOffsetRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id));
+    @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, version));
+
+        Map<TopicPartition, ?> targetTimes = partitionTimestamps == null ? 
offsetData : partitionTimestamps;
+        Map<String, Map<Integer, Object>> topicsData = 
CollectionUtils.groupDataByTopic(targetTimes);
+
+        struct.set(REPLICA_ID_KEY_NAME, replicaId);
+        List<Struct> topicArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, Object>> topicEntry: 
topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, Object> partitionEntry : 
topicEntry.getValue().entrySet()) {
+                if (version == 0) {
+                    PartitionData offsetPartitionData = (PartitionData) 
partitionEntry.getValue();
+                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
+                    partitionData.set(PARTITION_KEY_NAME, 
partitionEntry.getKey());
+                    partitionData.set(TIMESTAMP_KEY_NAME, 
offsetPartitionData.timestamp);
+                    partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, 
offsetPartitionData.maxNumOffsets);
+                    partitionArray.add(partitionData);
+                } else {
+                    Long timestamp = (Long) partitionEntry.getValue();
+                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
+                    partitionData.set(PARTITION_KEY_NAME, 
partitionEntry.getKey());
+                    partitionData.set(TIMESTAMP_KEY_NAME, timestamp);
+                    partitionArray.add(partitionData);
+                }
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index b815a53..cb3bafc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
@@ -35,7 +34,6 @@ public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
@@ -61,8 +59,6 @@ public class ListOffsetResponse extends AbstractResponse {
     private static final String TIMESTAMP_KEY_NAME = "timestamp";
     private static final String OFFSET_KEY_NAME = "offset";
 
-    private final Map<TopicPartition, PartitionData> responseData;
-
     public static final class PartitionData {
         public final Errors error;
         // The offsets list is only used in ListOffsetResponse v0.
@@ -110,46 +106,17 @@ public class ListOffsetResponse extends AbstractResponse {
         }
     }
 
+    private final Map<TopicPartition, PartitionData> responseData;
+
     /**
-     * Constructor for ListOffsetResponse v0.
+     * Constructor for all versions.
      */
-    @Deprecated
     public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) 
{
-        this(responseData, 0);
-    }
-
-    public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData, 
int version) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, 
version)));
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: 
topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
-                PartitionData offsetPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, 
offsetPartitionData.error.code());
-                if (version == 0)
-                    partitionData.set(OFFSETS_KEY_NAME, 
offsetPartitionData.offsets.toArray());
-                else {
-                    partitionData.set(TIMESTAMP_KEY_NAME, 
offsetPartitionData.timestamp);
-                    partitionData.set(OFFSET_KEY_NAME, 
offsetPartitionData.offset);
-                }
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
         this.responseData = responseData;
     }
 
     public ListOffsetResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
+        responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -178,11 +145,38 @@ public class ListOffsetResponse extends AbstractResponse {
         return responseData;
     }
 
-    public static ListOffsetResponse parse(ByteBuffer buffer) {
-        return new ListOffsetResponse(CURRENT_SCHEMA.read(buffer));
+    public static ListOffsetResponse parse(ByteBuffer buffer, short version) {
+        return new 
ListOffsetResponse(ProtoUtils.parseResponse(ApiKeys.LIST_OFFSETS.id, version, 
buffer));
     }
 
-    public static ListOffsetResponse parse(ByteBuffer buffer, int version) {
-        return new 
ListOffsetResponse(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, 
version).read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, version));
+        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
+
+        List<Struct> topicArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: 
topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
+                PartitionData offsetPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(ERROR_CODE_KEY_NAME, 
offsetPartitionData.error.code());
+                if (version == 0)
+                    partitionData.set(OFFSETS_KEY_NAME, 
offsetPartitionData.offsets.toArray());
+                else {
+                    partitionData.set(TIMESTAMP_KEY_NAME, 
offsetPartitionData.timestamp);
+                    partitionData.set(OFFSET_KEY_NAME, 
offsetPartitionData.offset);
+                }
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 16af1b7..f31315f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -51,8 +51,7 @@ public class MetadataRequest extends AbstractRequest {
         }
 
         @Override
-        public MetadataRequest build() {
-            short version = version();
+        public MetadataRequest build(short version) {
             if (version < 1) {
                 throw new UnsupportedVersionException("MetadataRequest " +
                         "versions older than 1 are not supported.");
@@ -79,27 +78,18 @@ public class MetadataRequest extends AbstractRequest {
 
     private final List<String> topics;
 
-    public static MetadataRequest allTopics(short version) {
-        return new MetadataRequest.Builder(null).setVersion(version).build();
-    }
-
     /**
      * In v0 null is not allowed and and empty list indicates requesting all 
topics.
      * Note: modern clients do not support sending v0 requests.
      * In v1 null indicates requesting all topics, and an empty list indicates 
requesting no topics.
      */
     public MetadataRequest(List<String> topics, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.METADATA.id, 
version)),
-                version);
-        if (topics == null)
-            struct.set(TOPICS_KEY_NAME, null);
-        else
-            struct.set(TOPICS_KEY_NAME, topics.toArray());
+        super(version);
         this.topics = topics;
     }
 
     public MetadataRequest(Struct struct, short version) {
-        super(struct, version);
+        super(version);
         Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
         if (topicArray != null) {
             topics = new ArrayList<>();
@@ -127,7 +117,7 @@ public class MetadataRequest extends AbstractRequest {
             case 0:
             case 1:
             case 2:
-                return new MetadataResponse(Collections.<Node>emptyList(), 
null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas, versionId);
+                return new MetadataResponse(Collections.<Node>emptyList(), 
null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
@@ -142,12 +132,17 @@ public class MetadataRequest extends AbstractRequest {
         return topics;
     }
 
-    public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer),
-                (short) versionId);
+    public static MetadataRequest parse(ByteBuffer buffer, short versionId) {
+        return new 
MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, 
buffer), versionId);
     }
 
-    public static MetadataRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.METADATA.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.METADATA.id, version()));
+        if (topics == null)
+            struct.set(TOPICS_KEY_NAME, null);
+        else
+            struct.set(TOPICS_KEY_NAME, topics.toArray());
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index a8baee5..268bf84 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -31,7 +31,6 @@ import java.util.Set;
 
 public class MetadataResponse extends AbstractResponse {
 
-    private static final short CURRENT_VERSION = 
ProtoUtils.latestVersion(ApiKeys.METADATA.id);
     private static final String BROKERS_KEY_NAME = "brokers";
     private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
 
@@ -83,78 +82,16 @@ public class MetadataResponse extends AbstractResponse {
     private final String clusterId;
 
     /**
-     * Constructor for the latest version
+     * Constructor for all versions.
      */
     public MetadataResponse(List<Node> brokers, String clusterId, int 
controllerId, List<TopicMetadata> topicMetadata) {
-        this(brokers, clusterId, controllerId, topicMetadata, CURRENT_VERSION);
-    }
-
-    /**
-     * Constructor for a specific version
-     */
-    public MetadataResponse(List<Node> brokers, String clusterId, int 
controllerId, List<TopicMetadata> topicMetadata, int version) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, 
version)));
         this.brokers = brokers;
         this.controller = getControllerNode(controllerId, brokers);
         this.topicMetadata = topicMetadata;
         this.clusterId = clusterId;
-
-        List<Struct> brokerArray = new ArrayList<>();
-        for (Node node : brokers) {
-            Struct broker = struct.instance(BROKERS_KEY_NAME);
-            broker.set(NODE_ID_KEY_NAME, node.id());
-            broker.set(HOST_KEY_NAME, node.host());
-            broker.set(PORT_KEY_NAME, node.port());
-            // This field only exists in v1+
-            if (broker.hasField(RACK_KEY_NAME))
-                broker.set(RACK_KEY_NAME, node.rack());
-            brokerArray.add(broker);
-        }
-        struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
-
-        // This field only exists in v1+
-        if (struct.hasField(CONTROLLER_ID_KEY_NAME))
-            struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
-
-        // This field only exists in v2+
-        if (struct.hasField(CLUSTER_ID_KEY_NAME))
-            struct.set(CLUSTER_ID_KEY_NAME, clusterId);
-
-        List<Struct> topicMetadataArray = new 
ArrayList<>(topicMetadata.size());
-        for (TopicMetadata metadata : topicMetadata) {
-            Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, metadata.topic);
-            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code());
-            // This field only exists in v1+
-            if (topicData.hasField(IS_INTERNAL_KEY_NAME))
-                topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal());
-
-            List<Struct> partitionMetadataArray = new 
ArrayList<>(metadata.partitionMetadata.size());
-            for (PartitionMetadata partitionMetadata : 
metadata.partitionMetadata()) {
-                Struct partitionData = 
topicData.instance(PARTITION_METADATA_KEY_NAME);
-                partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, 
partitionMetadata.error.code());
-                partitionData.set(PARTITION_KEY_NAME, 
partitionMetadata.partition);
-                partitionData.set(LEADER_KEY_NAME, 
partitionMetadata.leader.id());
-                ArrayList<Integer> replicas = new 
ArrayList<>(partitionMetadata.replicas.size());
-                for (Node node : partitionMetadata.replicas)
-                    replicas.add(node.id());
-                partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
-                ArrayList<Integer> isr = new 
ArrayList<>(partitionMetadata.isr.size());
-                for (Node node : partitionMetadata.isr)
-                    isr.add(node.id());
-                partitionData.set(ISR_KEY_NAME, isr.toArray());
-                partitionMetadataArray.add(partitionData);
-
-            }
-            topicData.set(PARTITION_METADATA_KEY_NAME, 
partitionMetadataArray.toArray());
-            topicMetadataArray.add(topicData);
-        }
-        struct.set(TOPIC_METADATA_KEY_NAME, topicMetadataArray.toArray());
     }
 
     public MetadataResponse(Struct struct) {
-        super(struct);
-
         Map<Integer, Node> brokers = new HashMap<>();
         Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
         for (int i = 0; i < brokerStructs.length; i++) {
@@ -317,12 +254,8 @@ public class MetadataResponse extends AbstractResponse {
         return this.clusterId;
     }
 
-    public static MetadataResponse parse(ByteBuffer buffer) {
-        return parse(buffer, CURRENT_VERSION);
-    }
-
-    public static MetadataResponse parse(ByteBuffer buffer, int version) {
-        return new 
MetadataResponse(ProtoUtils.responseSchema(ApiKeys.METADATA.id, 
version).read(buffer));
+    public static MetadataResponse parse(ByteBuffer buffer, short version) {
+        return new 
MetadataResponse(ProtoUtils.parseResponse(ApiKeys.METADATA.id, version, 
buffer));
     }
 
     public static class TopicMetadata {
@@ -400,4 +333,60 @@ public class MetadataResponse extends AbstractResponse {
 
     }
 
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version));
+        List<Struct> brokerArray = new ArrayList<>();
+        for (Node node : brokers) {
+            Struct broker = struct.instance(BROKERS_KEY_NAME);
+            broker.set(NODE_ID_KEY_NAME, node.id());
+            broker.set(HOST_KEY_NAME, node.host());
+            broker.set(PORT_KEY_NAME, node.port());
+            // This field only exists in v1+
+            if (broker.hasField(RACK_KEY_NAME))
+                broker.set(RACK_KEY_NAME, node.rack());
+            brokerArray.add(broker);
+        }
+        struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
+
+        // This field only exists in v1+
+        if (struct.hasField(CONTROLLER_ID_KEY_NAME))
+            struct.set(CONTROLLER_ID_KEY_NAME, controller == null ? 
NO_CONTROLLER_ID : controller.id());
+
+        // This field only exists in v2+
+        if (struct.hasField(CLUSTER_ID_KEY_NAME))
+            struct.set(CLUSTER_ID_KEY_NAME, clusterId);
+
+        List<Struct> topicMetadataArray = new 
ArrayList<>(topicMetadata.size());
+        for (TopicMetadata metadata : topicMetadata) {
+            Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, metadata.topic);
+            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code());
+            // This field only exists in v1+
+            if (topicData.hasField(IS_INTERNAL_KEY_NAME))
+                topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal());
+
+            List<Struct> partitionMetadataArray = new 
ArrayList<>(metadata.partitionMetadata.size());
+            for (PartitionMetadata partitionMetadata : 
metadata.partitionMetadata()) {
+                Struct partitionData = 
topicData.instance(PARTITION_METADATA_KEY_NAME);
+                partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, 
partitionMetadata.error.code());
+                partitionData.set(PARTITION_KEY_NAME, 
partitionMetadata.partition);
+                partitionData.set(LEADER_KEY_NAME, 
partitionMetadata.leader.id());
+                ArrayList<Integer> replicas = new 
ArrayList<>(partitionMetadata.replicas.size());
+                for (Node node : partitionMetadata.replicas)
+                    replicas.add(node.id());
+                partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+                ArrayList<Integer> isr = new 
ArrayList<>(partitionMetadata.isr.size());
+                for (Node node : partitionMetadata.isr)
+                    isr.add(node.id());
+                partitionData.set(ISR_KEY_NAME, isr.toArray());
+                partitionMetadataArray.add(partitionData);
+
+            }
+            topicData.set(PARTITION_METADATA_KEY_NAME, 
partitionMetadataArray.toArray());
+            topicMetadataArray.add(topicData);
+        }
+        struct.set(TOPIC_METADATA_KEY_NAME, topicMetadataArray.toArray());
+        return struct;
+    }
 }

Reply via email to