http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b7645dd..097b4fc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -12,31 +12,19 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.ProduceResponse;
@@ -46,6 +34,17 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class SenderTest {
 
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
@@ -69,7 +68,7 @@ public class SenderTest {
 
     @Before
     public void setup() {
-        Map<String, String> metricTags = new LinkedHashMap<String, String>();
+        Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", CLIENT_ID);
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         metrics = new Metrics(metricConfig, time);
@@ -147,7 +146,7 @@ public class SenderTest {
             Future<RecordMetadata> future = accumulator.append(tp, 0L, 
"key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
-            String id = client.requests().peek().request().destination();
+            String id = client.requests().peek().destination();
             Node node = new Node(Integer.valueOf(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", 
client.isReady(node, 0L));
@@ -168,7 +167,7 @@ public class SenderTest {
             future = accumulator.append(tp, 0L, "key".getBytes(), 
"value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
             for (int i = 0; i < maxRetries + 1; i++) {
-                
client.disconnect(client.requests().peek().request().destination());
+                client.disconnect(client.requests().peek().destination());
                 sender.run(time.milliseconds()); // receive error
                 sender.run(time.milliseconds()); // reconnect
                 sender.run(time.milliseconds()); // resend
@@ -205,8 +204,8 @@ public class SenderTest {
             accumulator.append(tp2, 0L, "key1".getBytes(), 
"value1".getBytes(), null, MAX_BLOCK_TIMEOUT);
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
-            String id = client.requests().peek().request().destination();
-            assertEquals(ApiKeys.PRODUCE.id, 
client.requests().peek().request().header().apiKey());
+            String id = client.requests().peek().destination();
+            assertEquals(ApiKeys.PRODUCE.id, 
client.requests().peek().header().apiKey());
             Node node = new Node(Integer.valueOf(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", 
client.isReady(node, 0L));
@@ -272,11 +271,10 @@ public class SenderTest {
         }
     }
 
-    private Struct produceResponse(TopicPartition tp, long offset, int error, 
int throttleTimeMs) {
+    private ProduceResponse produceResponse(TopicPartition tp, long offset, 
int error, int throttleTimeMs) {
         ProduceResponse.PartitionResponse resp = new 
ProduceResponse.PartitionResponse((short) error, offset, Record.NO_TIMESTAMP);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = 
Collections.singletonMap(tp, resp);
-        ProduceResponse response = new ProduceResponse(partResp, 
throttleTimeMs);
-        return response.toStruct();
+        return new ProduceResponse(partResp, throttleTimeMs);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 30faac1..a5eb22a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,15 +16,20 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,6 +41,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class RequestResponseTest {
 
@@ -113,7 +119,6 @@ public class RequestResponseTest {
         for (AbstractRequestResponse req : requestResponseList)
             checkSerialization(req, null);
 
-
         checkOlderFetchVersions();
         checkSerialization(createMetadataResponse(0), 0);
         checkSerialization(createMetadataResponse(1), 1);
@@ -180,7 +185,9 @@ public class RequestResponseTest {
     @Test
     public void fetchResponseVersionTest() {
         Map<TopicPartition, FetchResponse.PartitionData> responseData = new 
HashMap<>();
-        responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData(Errors.NONE.code(), 1000000, 
ByteBuffer.allocate(10)));
+
+        MemoryRecords records = 
MemoryRecords.readableRecords(ByteBuffer.allocate(10));
+        responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
 
         FetchResponse v0Response = new FetchResponse(responseData);
         FetchResponse v1Response = new FetchResponse(responseData, 10);
@@ -193,6 +200,34 @@ public class RequestResponseTest {
     }
 
     @Test
+    public void verifyFetchResponseFullWrite() throws Exception {
+        FetchResponse fetchResponse = createFetchResponse();
+        RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, "client", 
15);
+
+        Send send = fetchResponse.toSend("1", header);
+        ByteBufferChannel channel = new ByteBufferChannel(send.size());
+        send.writeTo(channel);
+        channel.close();
+
+        ByteBuffer buf = channel.buf;
+
+        // read the size
+        int size = buf.getInt();
+        assertTrue(size > 0);
+
+        // read the header
+        ResponseHeader responseHeader = ResponseHeader.parse(channel.buf);
+        assertEquals(header.correlationId(), responseHeader.correlationId());
+
+        // read the body
+        Struct responseBody = ProtoUtils.responseSchema(ApiKeys.FETCH.id, 
header.apiVersion()).read(buf);
+        FetchResponse parsedResponse = new FetchResponse(responseBody);
+        assertEquals(parsedResponse, fetchResponse);
+
+        assertEquals(size, responseHeader.sizeOf() + parsedResponse.sizeOf());
+    }
+
+    @Test
     public void testControlledShutdownResponse() {
         ControlledShutdownResponse response = 
createControlledShutdownResponse();
         ByteBuffer buffer = ByteBuffer.allocate(response.sizeOf());
@@ -216,24 +251,24 @@ public class RequestResponseTest {
         assertEquals("", deserialized.clientId()); // null is defaulted to ""
     }
 
-    private AbstractRequestResponse createRequestHeader() {
+    private RequestHeader createRequestHeader() {
         return new RequestHeader((short) 10, (short) 1, "", 10);
     }
 
-    private AbstractRequestResponse createResponseHeader() {
+    private ResponseHeader createResponseHeader() {
         return new ResponseHeader(10);
     }
 
-    private AbstractRequest createGroupCoordinatorRequest() {
+    private GroupCoordinatorRequest createGroupCoordinatorRequest() {
         return new GroupCoordinatorRequest("test-group");
     }
 
-    private AbstractRequestResponse createGroupCoordinatorResponse() {
+    private GroupCoordinatorResponse createGroupCoordinatorResponse() {
         return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, 
"host1", 2014));
     }
 
     @SuppressWarnings("deprecation")
-    private AbstractRequest createFetchRequest(int version) {
+    private FetchRequest createFetchRequest(int version) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = 
new LinkedHashMap<>();
         fetchData.put(new TopicPartition("test1", 0), new 
FetchRequest.PartitionData(100, 1000000));
         fetchData.put(new TopicPartition("test2", 0), new 
FetchRequest.PartitionData(200, 1000000));
@@ -243,22 +278,23 @@ public class RequestResponseTest {
             return new FetchRequest(100, 1000, 1000000, fetchData);
     }
 
-    private AbstractRequestResponse createFetchResponse() {
+    private FetchResponse createFetchResponse() {
         Map<TopicPartition, FetchResponse.PartitionData> responseData = new 
HashMap<>();
-        responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData(Errors.NONE.code(), 1000000, 
ByteBuffer.allocate(10)));
-        return new FetchResponse(responseData, 0);
+        MemoryRecords records = 
MemoryRecords.readableRecords(ByteBuffer.allocate(10));
+        responseData.put(new TopicPartition("test", 0), new 
FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
+        return new FetchResponse(responseData, 25);
     }
 
-    private AbstractRequest createHeartBeatRequest() {
+    private HeartbeatRequest createHeartBeatRequest() {
         return new HeartbeatRequest("group1", 1, "consumer1");
     }
 
-    private AbstractRequestResponse createHeartBeatResponse() {
+    private HeartbeatResponse createHeartBeatResponse() {
         return new HeartbeatResponse(Errors.NONE.code());
     }
 
     @SuppressWarnings("deprecation")
-    private AbstractRequest createJoinGroupRequest(int version) {
+    private JoinGroupRequest createJoinGroupRequest(int version) {
         ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
         List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
         protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", 
metadata));
@@ -269,27 +305,27 @@ public class RequestResponseTest {
         }
     }
 
-    private AbstractRequestResponse createJoinGroupResponse() {
+    private JoinGroupResponse createJoinGroupResponse() {
         Map<String, ByteBuffer> members = new HashMap<>();
         members.put("consumer1", ByteBuffer.wrap(new byte[]{}));
         members.put("consumer2", ByteBuffer.wrap(new byte[]{}));
         return new JoinGroupResponse(Errors.NONE.code(), 1, "range", 
"consumer1", "leader", members);
     }
 
-    private AbstractRequest createListGroupsRequest() {
+    private ListGroupsRequest createListGroupsRequest() {
         return new ListGroupsRequest();
     }
 
-    private AbstractRequestResponse createListGroupsResponse() {
+    private ListGroupsResponse createListGroupsResponse() {
         List<ListGroupsResponse.Group> groups = Arrays.asList(new 
ListGroupsResponse.Group("test-group", "consumer"));
         return new ListGroupsResponse(Errors.NONE.code(), groups);
     }
 
-    private AbstractRequest createDescribeGroupRequest() {
+    private DescribeGroupsRequest createDescribeGroupRequest() {
         return new 
DescribeGroupsRequest(Collections.singletonList("test-group"));
     }
 
-    private AbstractRequestResponse createDescribeGroupResponse() {
+    private DescribeGroupsResponse createDescribeGroupResponse() {
         String clientId = "consumer-1";
         String clientHost = "localhost";
         ByteBuffer empty = ByteBuffer.allocate(0);
@@ -300,16 +336,16 @@ public class RequestResponseTest {
         return new 
DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
     }
 
-    private AbstractRequest createLeaveGroupRequest() {
+    private LeaveGroupRequest createLeaveGroupRequest() {
         return new LeaveGroupRequest("group1", "consumer1");
     }
 
-    private AbstractRequestResponse createLeaveGroupResponse() {
+    private LeaveGroupResponse createLeaveGroupResponse() {
         return new LeaveGroupResponse(Errors.NONE.code());
     }
 
     @SuppressWarnings("deprecation")
-    private AbstractRequest createListOffsetRequest(int version) {
+    private ListOffsetRequest createListOffsetRequest(int version) {
         if (version == 0) {
             Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = 
new HashMap<>();
             offsetData.put(new TopicPartition("test", 0), new 
ListOffsetRequest.PartitionData(1000000L, 10));
@@ -324,7 +360,7 @@ public class RequestResponseTest {
     }
 
     @SuppressWarnings("deprecation")
-    private AbstractRequestResponse createListOffsetResponse(int version) {
+    private ListOffsetResponse createListOffsetResponse(int version) {
         if (version == 0) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData 
= new HashMap<>();
             responseData.put(new TopicPartition("test", 0), new 
ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
@@ -338,11 +374,11 @@ public class RequestResponseTest {
         }
     }
 
-    private AbstractRequest createMetadataRequest(List<String> topics) {
+    private MetadataRequest createMetadataRequest(List<String> topics) {
         return new MetadataRequest(topics);
     }
 
-    private AbstractRequestResponse createMetadataResponse(int version) {
+    private MetadataResponse createMetadataResponse(int version) {
         Node node = new Node(1, "host1", 1001);
         List<Node> replicas = Arrays.asList(node);
         List<Node> isr = Arrays.asList(node);
@@ -357,7 +393,7 @@ public class RequestResponseTest {
     }
 
     @SuppressWarnings("deprecation")
-    private AbstractRequest createOffsetCommitRequest(int version) {
+    private OffsetCommitRequest createOffsetCommitRequest(int version) {
         Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = 
new HashMap<>();
         commitData.put(new TopicPartition("test", 0), new 
OffsetCommitRequest.PartitionData(100, ""));
         commitData.put(new TopicPartition("test", 1), new 
OffsetCommitRequest.PartitionData(200, null));
@@ -371,47 +407,47 @@ public class RequestResponseTest {
         throw new IllegalArgumentException("Unknown offset commit request 
version " + version);
     }
 
-    private AbstractRequestResponse createOffsetCommitResponse() {
+    private OffsetCommitResponse createOffsetCommitResponse() {
         Map<TopicPartition, Short> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
         return new OffsetCommitResponse(responseData);
     }
 
-    private AbstractRequest createOffsetFetchRequest() {
+    private OffsetFetchRequest createOffsetFetchRequest() {
         return new OffsetFetchRequest("group1", Arrays.asList(new 
TopicPartition("test11", 1)));
     }
 
-    private AbstractRequestResponse createOffsetFetchResponse() {
+    private OffsetFetchResponse createOffsetFetchResponse() {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = 
new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new 
OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
         responseData.put(new TopicPartition("test", 1), new 
OffsetFetchResponse.PartitionData(100L, null, Errors.NONE.code()));
         return new OffsetFetchResponse(responseData);
     }
 
-    private AbstractRequest createProduceRequest() {
-        Map<TopicPartition, ByteBuffer> produceData = new HashMap<>();
-        produceData.put(new TopicPartition("test", 0), 
ByteBuffer.allocate(10));
+    private ProduceRequest createProduceRequest() {
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), 
MemoryRecords.readableRecords(ByteBuffer.allocate(10)));
         return new ProduceRequest((short) 1, 5000, produceData);
     }
 
-    private AbstractRequestResponse createProduceResponse() {
+    private ProduceResponse createProduceResponse() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = 
new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new 
ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, 
Record.NO_TIMESTAMP));
         return new ProduceResponse(responseData, 0);
     }
 
-    private AbstractRequest createStopReplicaRequest(boolean deletePartitions) 
{
+    private StopReplicaRequest createStopReplicaRequest(boolean 
deletePartitions) {
         Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new 
TopicPartition("test", 0)));
         return new StopReplicaRequest(0, 1, deletePartitions, partitions);
     }
 
-    private AbstractRequestResponse createStopReplicaResponse() {
+    private StopReplicaResponse createStopReplicaResponse() {
         Map<TopicPartition, Short> responses = new HashMap<>();
         responses.put(new TopicPartition("test", 0), Errors.NONE.code());
         return new StopReplicaResponse(Errors.NONE.code(), responses);
     }
 
-    private AbstractRequest createControlledShutdownRequest() {
+    private ControlledShutdownRequest createControlledShutdownRequest() {
         return new ControlledShutdownRequest(10);
     }
 
@@ -423,7 +459,7 @@ public class RequestResponseTest {
         return new ControlledShutdownResponse(Errors.NONE.code(), 
topicPartitions);
     }
 
-    private AbstractRequest createLeaderAndIsrRequest() {
+    private LeaderAndIsrRequest createLeaderAndIsrRequest() {
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
         List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
@@ -442,14 +478,14 @@ public class RequestResponseTest {
         return new LeaderAndIsrRequest(1, 10, partitionStates, leaders);
     }
 
-    private AbstractRequestResponse createLeaderAndIsrResponse() {
+    private LeaderAndIsrResponse createLeaderAndIsrResponse() {
         Map<TopicPartition, Short> responses = new HashMap<>();
         responses.put(new TopicPartition("test", 0), Errors.NONE.code());
         return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
     }
 
     @SuppressWarnings("deprecation")
-    private AbstractRequest createUpdateMetadataRequest(int version, String 
rack) {
+    private UpdateMetadataRequest createUpdateMetadataRequest(int version, 
String rack) {
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
         List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
@@ -482,28 +518,28 @@ public class RequestResponseTest {
         }
     }
 
-    private AbstractRequestResponse createUpdateMetadataResponse() {
+    private UpdateMetadataResponse createUpdateMetadataResponse() {
         return new UpdateMetadataResponse(Errors.NONE.code());
     }
 
-    private AbstractRequest createSaslHandshakeRequest() {
+    private SaslHandshakeRequest createSaslHandshakeRequest() {
         return new SaslHandshakeRequest("PLAIN");
     }
 
-    private AbstractRequestResponse createSaslHandshakeResponse() {
+    private SaslHandshakeResponse createSaslHandshakeResponse() {
         return new SaslHandshakeResponse(Errors.NONE.code(), 
Collections.singletonList("GSSAPI"));
     }
 
-    private AbstractRequest createApiVersionRequest() {
+    private ApiVersionsRequest createApiVersionRequest() {
         return new ApiVersionsRequest();
     }
 
-    private AbstractRequestResponse createApiVersionResponse() {
+    private ApiVersionsResponse createApiVersionResponse() {
         List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new 
ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
         return new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
     }
 
-    private AbstractRequest createCreateTopicRequest() {
+    private CreateTopicsRequest createCreateTopicRequest() {
         CreateTopicsRequest.TopicDetails request1 = new 
CreateTopicsRequest.TopicDetails(3, (short) 5);
 
         Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
@@ -521,21 +557,65 @@ public class RequestResponseTest {
         return new CreateTopicsRequest(request, 0);
     }
 
-    private AbstractRequestResponse createCreateTopicResponse() {
+    private CreateTopicsResponse createCreateTopicResponse() {
         Map<String, Errors> errors = new HashMap<>();
         errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
         errors.put("t2", Errors.LEADER_NOT_AVAILABLE);
         return new CreateTopicsResponse(errors);
     }
 
-    private AbstractRequest createDeleteTopicsRequest() {
+    private DeleteTopicsRequest createDeleteTopicsRequest() {
         return new DeleteTopicsRequest(new HashSet<>(Arrays.asList("my_t1", 
"my_t2")), 10000);
     }
 
-    private AbstractRequestResponse createDeleteTopicsResponse() {
+    private DeleteTopicsResponse createDeleteTopicsResponse() {
         Map<String, Errors> errors = new HashMap<>();
         errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
         errors.put("t2", Errors.TOPIC_AUTHORIZATION_FAILED);
         return new DeleteTopicsResponse(errors);
     }
+
+    private static class ByteBufferChannel implements GatheringByteChannel {
+        private final ByteBuffer buf;
+        private boolean closed = false;
+
+        private ByteBufferChannel(long size) {
+            this.buf = ByteBuffer.allocate(Long.valueOf(size).intValue());
+        }
+
+        @Override
+        public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException {
+            int position = buf.position();
+            for (int i = 0; i < length; i++) {
+                ByteBuffer src = srcs[i].duplicate();
+                if (i == 0)
+                    src.position(offset);
+                buf.put(src);
+            }
+            return buf.position() - position;
+        }
+
+        @Override
+        public long write(ByteBuffer[] srcs) throws IOException {
+            return write(srcs, 0, srcs.length);
+        }
+
+        @Override
+        public int write(ByteBuffer src) throws IOException {
+            int position = buf.position();
+            buf.put(src);
+            return buf.position() - position;
+        }
+
+        @Override
+        public boolean isOpen() {
+            return !closed;
+        }
+
+        @Override
+        public void close() throws IOException {
+            buf.flip();
+            closed = true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 97fe3d8..27c5695 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -12,18 +12,6 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
@@ -36,17 +24,17 @@ import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
@@ -55,6 +43,18 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 /**
  * Tests for the Sasl authenticator. These use a test harness that runs a 
simple socket server that echos back responses.
  */
@@ -261,7 +261,8 @@ public class SaslAuthenticatorTest {
         String node = "1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node);
         RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, 
Short.MAX_VALUE, "someclient", 1);
-        selector.send(new NetworkSend(node, RequestSend.serialize(header, new 
ApiVersionsRequest().toStruct())));
+        ApiVersionsRequest request = new ApiVersionsRequest();
+        selector.send(request.toSend(node, header));
         ByteBuffer responseBuffer = waitForResponse();
         ResponseHeader.parse(responseBuffer);
         ApiVersionsResponse response = 
ApiVersionsResponse.parse(responseBuffer);
@@ -291,8 +292,9 @@ public class SaslAuthenticatorTest {
         // Send ApiVersionsRequest and validate error response.
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+        SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
         RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, 
Short.MAX_VALUE, "someclient", 2);
-        selector.send(new NetworkSend(node1, RequestSend.serialize(header, new 
SaslHandshakeRequest("PLAIN").toStruct())));
+        selector.send(request.toSend(node1, header));
         NetworkTestUtils.waitForChannelClose(selector, node1);
         selector.close();
 
@@ -355,8 +357,9 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
         sendHandshakeRequestReceiveResponse(node1);
 
+        ApiVersionsRequest request = new ApiVersionsRequest();
         RequestHeader versionsHeader = new 
RequestHeader(ApiKeys.API_VERSIONS.id, "someclient", 2);
-        selector.send(new NetworkSend(node1, 
RequestSend.serialize(versionsHeader, new ApiVersionsRequest().toStruct())));
+        selector.send(request.toSend(node1, versionsHeader));
         NetworkTestUtils.waitForChannelClose(selector, node1);
         selector.close();
 
@@ -420,7 +423,7 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
         RequestHeader metadataRequestHeader1 = new 
RequestHeader(ApiKeys.METADATA.id, "someclient", 1);
         MetadataRequest metadataRequest1 = new 
MetadataRequest(Collections.singletonList("sometopic"));
-        selector.send(new NetworkSend(node1, 
RequestSend.serialize(metadataRequestHeader1, metadataRequest1.toStruct())));
+        selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
         NetworkTestUtils.waitForChannelClose(selector, node1);
         selector.close();
 
@@ -433,7 +436,7 @@ public class SaslAuthenticatorTest {
         sendHandshakeRequestReceiveResponse(node2);
         RequestHeader metadataRequestHeader2 = new 
RequestHeader(ApiKeys.METADATA.id, "someclient", 2);
         MetadataRequest metadataRequest2 = new 
MetadataRequest(Collections.singletonList("sometopic"));
-        selector.send(new NetworkSend(node2, 
RequestSend.serialize(metadataRequestHeader2, metadataRequest2.toStruct())));
+        selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
         NetworkTestUtils.waitForChannelClose(selector, node2);
         selector.close();
 
@@ -581,25 +584,24 @@ public class SaslAuthenticatorTest {
         selector = null;
     }
 
-    private Struct sendKafkaRequestReceiveResponse(String node, ApiKeys 
apiKey, AbstractRequestResponse request) throws IOException {
+    private AbstractResponse sendKafkaRequestReceiveResponse(String node, 
ApiKeys apiKey, AbstractRequest request) throws IOException {
         RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1);
-        selector.send(new NetworkSend(node, RequestSend.serialize(header, 
request.toStruct())));
+        Send send = request.toSend(node, header);
+        selector.send(send);
         ByteBuffer responseBuffer = waitForResponse();
         return NetworkClient.parseResponse(responseBuffer, header);
     }
 
     private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String 
node) throws Exception {
         SaslHandshakeRequest handshakeRequest = new 
SaslHandshakeRequest("PLAIN");
-        Struct responseStruct = sendKafkaRequestReceiveResponse(node, 
ApiKeys.SASL_HANDSHAKE, handshakeRequest);
-        SaslHandshakeResponse response = new 
SaslHandshakeResponse(responseStruct);
+        SaslHandshakeResponse response = (SaslHandshakeResponse) 
sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
         assertEquals(Errors.NONE.code(), response.errorCode());
         return response;
     }
 
     private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) 
throws Exception {
         ApiVersionsRequest handshakeRequest = new ApiVersionsRequest();
-        Struct responseStruct = sendKafkaRequestReceiveResponse(node, 
ApiKeys.API_VERSIONS, handshakeRequest);
-        ApiVersionsResponse response = new ApiVersionsResponse(responseStruct);
+        ApiVersionsResponse response =  (ApiVersionsResponse) 
sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest);
         assertEquals(Errors.NONE.code(), response.errorCode());
         return response;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3bfa83f..3aff8f2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -16,7 +16,6 @@
  **/
 package org.apache.kafka.connect.runtime.distributed;
 
-import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -24,7 +23,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
@@ -172,7 +171,7 @@ public class WorkerCoordinatorTest {
 
         final String consumerId = "leader";
 
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
@@ -182,8 +181,8 @@ public class WorkerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberConfigOffsets, Errors.NONE.code()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
                 return sync.memberId().equals(consumerId) &&
                         sync.generationId() == 1 &&
                         sync.groupAssignment().containsKey(consumerId);
@@ -212,15 +211,15 @@ public class WorkerCoordinatorTest {
 
         final String memberId = "member";
 
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
                 return sync.memberId().equals(memberId) &&
                         sync.generationId() == 1 &&
                         sync.groupAssignment().isEmpty();
@@ -253,15 +252,15 @@ public class WorkerCoordinatorTest {
 
         final String memberId = "member";
 
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // config mismatch results in assignment error
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
         MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() {
             @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
                 return sync.memberId().equals(memberId) &&
                         sync.generationId() == 1 &&
                         sync.groupAssignment().isEmpty();
@@ -284,7 +283,7 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join the group once
@@ -392,12 +391,11 @@ public class WorkerCoordinatorTest {
     }
 
 
-    private Struct groupMetadataResponse(Node node, short error) {
-        GroupCoordinatorResponse response = new 
GroupCoordinatorResponse(error, node);
-        return response.toStruct();
+    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short 
error) {
+        return new GroupCoordinatorResponse(error, node);
     }
 
-    private Struct joinGroupLeaderResponse(int generationId, String memberId,
+    private JoinGroupResponse joinGroupLeaderResponse(int generationId, String 
memberId,
                                            Map<String, Long> configOffsets, 
short error) {
         Map<String, ByteBuffer> metadata = new HashMap<>();
         for (Map.Entry<String, Long> configStateEntry : 
configOffsets.entrySet()) {
@@ -407,22 +405,21 @@ public class WorkerCoordinatorTest {
             ByteBuffer buf = ConnectProtocol.serializeMetadata(new 
ConnectProtocol.WorkerState(memberUrl, configOffset));
             metadata.put(configStateEntry.getKey(), buf);
         }
-        return new JoinGroupResponse(error, generationId, 
WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
+        return new JoinGroupResponse(error, generationId, 
WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata);
     }
 
-    private Struct joinGroupFollowerResponse(int generationId, String 
memberId, String leaderId, short error) {
+    private JoinGroupResponse joinGroupFollowerResponse(int generationId, 
String memberId, String leaderId, short error) {
         return new JoinGroupResponse(error, generationId, 
WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+                Collections.<String, ByteBuffer>emptyMap());
     }
 
-    private Struct syncGroupResponse(short assignmentError, String leader, 
long configOffset, List<String> connectorIds,
+    private SyncGroupResponse syncGroupResponse(short assignmentError, String 
leader, long configOffset, List<String> connectorIds,
                                      List<ConnectorTaskId> taskIds, short 
error) {
         ConnectProtocol.Assignment assignment = new 
ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, 
connectorIds, taskIds);
         ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment);
-        return new SyncGroupResponse(error, buf).toStruct();
+        return new SyncGroupResponse(error, buf);
     }
 
-
     private static class MockRebalanceListener implements 
WorkerRebalanceListener {
         public ConnectProtocol.Assignment assignment = null;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index 1179557..592fecf 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -42,7 +42,7 @@ class AdminClient(val time: Time,
 
   private def send(target: Node,
                    api: ApiKeys,
-                   request: AbstractRequest): Struct = {
+                   request: AbstractRequest): AbstractResponse = {
     var future: RequestFuture[ClientResponse] = null
 
     future = client.send(target, api, request)
@@ -54,9 +54,8 @@ class AdminClient(val time: Time,
       throw future.exception()
   }
 
-  private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
-    bootstrapBrokers.foreach {
-      case broker =>
+  private def sendAnyNode(api: ApiKeys, request: AbstractRequest): 
AbstractResponse = {
+    bootstrapBrokers.foreach { broker =>
         try {
           return send(broker, api, request)
         } catch {
@@ -69,23 +68,20 @@ class AdminClient(val time: Time,
 
   private def findCoordinator(groupId: String): Node = {
     val request = new GroupCoordinatorRequest(groupId)
-    val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request)
-    val response = new GroupCoordinatorResponse(responseBody)
+    val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, 
request).asInstanceOf[GroupCoordinatorResponse]
     Errors.forCode(response.errorCode()).maybeThrow()
     response.node()
   }
 
   def listGroups(node: Node): List[GroupOverview] = {
-    val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
-    val response = new ListGroupsResponse(responseBody)
+    val response = send(node, ApiKeys.LIST_GROUPS, new 
ListGroupsRequest()).asInstanceOf[ListGroupsResponse]
     Errors.forCode(response.errorCode()).maybeThrow()
     response.groups().map(group => GroupOverview(group.groupId(), 
group.protocolType())).toList
   }
 
   private def findAllBrokers(): List[Node] = {
     val request = new MetadataRequest(List[String]())
-    val responseBody = sendAnyNode(ApiKeys.METADATA, request)
-    val response = new MetadataResponse(responseBody)
+    val response = sendAnyNode(ApiKeys.METADATA, 
request).asInstanceOf[MetadataResponse]
     val errors = response.errors()
     if (!errors.isEmpty)
       debug(s"Metadata request contained errors: $errors")
@@ -140,7 +136,7 @@ class AdminClient(val time: Time,
   def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
     val coordinator = findCoordinator(groupId)
     val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new 
DescribeGroupsRequest(List(groupId).asJava))
-    val response = new DescribeGroupsResponse(responseBody)
+    val response = responseBody.asInstanceOf[DescribeGroupsResponse]
     val metadata = response.groups.get(groupId)
     if (metadata == null)
       throw new KafkaException(s"Response from broker contained no metadata 
for group $groupId")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala 
b/core/src/main/scala/kafka/api/FetchRequest.scala
index 57e99c1..00897db 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -199,8 +199,7 @@ case class FetchRequest(versionId: Short = 
FetchRequest.CurrentVersion,
     val fetchResponsePartitionData = requestInfo.map { case 
(topicAndPartition, _) =>
       (topicAndPartition, 
FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
     }
-    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
-    val errorResponse = FetchResponse(correlationId, 
fetchResponsePartitionData, fetchRequest.versionId)
+    val errorResponse = FetchResponse(correlationId, 
fetchResponsePartitionData, request.header.apiVersion)
     // Magic value does not matter here because the message set is empty
     requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(request.connectionId, errorResponse)))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala 
b/core/src/main/scala/kafka/api/FetchResponse.scala
index d99bbcd..d31d4ba 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -76,7 +76,8 @@ class PartitionDataSend(val partitionId: Int,
       written += channel.write(buffer)
     if (!buffer.hasRemaining) {
       if (messagesSentSize < messageSize) {
-        val bytesSent = partitionData.messages.writeTo(channel, 
messagesSentSize, messageSize - messagesSentSize)
+        val records = partitionData.messages.asRecords
+        val bytesSent = records.writeTo(channel, messagesSentSize, messageSize 
- messagesSentSize).toInt
         messagesSentSize += bytesSent
         written += bytesSent
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala 
b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
index cb5b95e..3783c29 100644
--- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
@@ -14,13 +14,14 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import org.apache.kafka.common.requests.AbstractRequestResponse
+
 import kafka.api.ApiUtils._
+import org.apache.kafka.common.requests.AbstractResponse
 
 private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
                                                       val correlationId: Int,
                                                       val clientId: String,
-                                                      val body: 
AbstractRequestResponse,
+                                                      val body: 
AbstractResponse,
                                                       val name: String,
                                                       override val requestId: 
Option[Short] = None)
   extends RequestOrResponse(requestId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala 
b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
index 2835fb6..be0c080 100644
--- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
@@ -14,10 +14,11 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import org.apache.kafka.common.requests.AbstractRequestResponse
+
+import org.apache.kafka.common.requests.AbstractResponse
 
 private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
-                                                       val body: 
AbstractRequestResponse,
+                                                       val body: 
AbstractResponse,
                                                        val name: String,
                                                        override val requestId: 
Option[Short] = None)
   extends RequestOrResponse(requestId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala 
b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 3ca7bd7..f6e4475 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -128,8 +128,8 @@ case class ProducerRequest(versionId: Short = 
ProducerRequest.CurrentVersion,
     describe(true)
   }
 
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {
+  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
+    
if(request.body.asInstanceOf[org.apache.kafka.common.requests.ProduceRequest].acks
 == 0) {
         requestChannel.closeConnection(request.processor, request)
     }
     else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala 
b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 65b37fd..d013047 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -42,6 +42,6 @@ abstract class RequestOrResponse(val requestId: Option[Short] 
= None) extends Lo
   *  This API has no meaning for a Response object.
    * @param details If this is false, omit the parts of the request 
description that are proportional to the number of
    *                topics or partitions. This is mainly to control the amount 
of request logging. */
-  def describe(details: Boolean):String
+  def describe(details: Boolean): String
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 03cd98c..1935ea2 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -42,7 +42,7 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
-  controllerContext.liveBrokers.foreach(addNewBroker(_))
+  controllerContext.liveBrokers.foreach(addNewBroker)
 
   def startup() = {
     brokerLock synchronized {
@@ -56,7 +56,7 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
     }
   }
 
-  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], 
request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
+  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], 
request: AbstractRequest, callback: AbstractResponse => Unit = null) {
     brokerLock synchronized {
       val stateInfoOpt = brokerStateInfo.get(brokerId)
       stateInfoOpt match {
@@ -149,7 +149,7 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
   }
 }
 
-case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: 
AbstractRequest, callback: AbstractRequestResponse => Unit)
+case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: 
AbstractRequest, callback: AbstractResponse => Unit)
 
 class RequestSendThread(val controllerId: Int,
                         val controllerContext: ControllerContext,
@@ -185,28 +185,27 @@ class RequestSendThread(val controllerId: Int,
             }
             else {
               val requestHeader = 
apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey,
 _))
-              val send = new RequestSend(brokerNode.idString, requestHeader, 
request.toStruct)
-              val clientRequest = new ClientRequest(time.milliseconds(), true, 
send, null)
-              clientResponse = 
networkClient.blockingSendAndReceive(clientRequest)(time)
+              val clientRequest = new ClientRequest(brokerNode.idString, 
time.milliseconds(), true, requestHeader, request, null)
+              clientResponse = 
networkClient.blockingSendAndReceive(clientRequest, request)(time)
               isSendSuccessful = true
             }
           } catch {
             case e: Throwable => // if the send was not successful, reconnect 
to broker and resend the message
               warn(("Controller %d epoch %d fails to send request %s to broker 
%s. " +
                 "Reconnecting to broker.").format(controllerId, 
controllerContext.epoch,
-                  request.toString, brokerNode.toString()), e)
+                  request.toString, brokerNode.toString), e)
               networkClient.close(brokerNode.idString)
               isSendSuccessful = false
               backoff()
           }
         }
         if (clientResponse != null) {
-          val response = 
ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
-            case ApiKeys.LEADER_AND_ISR => new 
LeaderAndIsrResponse(clientResponse.responseBody)
-            case ApiKeys.STOP_REPLICA => new 
StopReplicaResponse(clientResponse.responseBody)
-            case ApiKeys.UPDATE_METADATA_KEY => new 
UpdateMetadataResponse(clientResponse.responseBody)
-            case apiKey => throw new KafkaException(s"Unexpected apiKey 
received: $apiKey")
-          }
+          val api = ApiKeys.forId(clientResponse.requestHeader.apiKey)
+          if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && 
api != ApiKeys.UPDATE_METADATA_KEY)
+            throw new KafkaException(s"Unexpected apiKey received: $apiKey")
+
+          val response = clientResponse.responseBody
+
           stateChangeLogger.trace("Controller %d epoch %d received response %s 
for a request sent to broker %s"
             .format(controllerId, controllerContext.epoch, response.toString, 
brokerNode.toString))
 
@@ -217,7 +216,7 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e: Throwable =>
-        error("Controller %d fails to send a request to broker 
%s".format(controllerId, brokerNode.toString()), e)
+        error("Controller %d fails to send a request to broker 
%s".format(controllerId, brokerNode.toString), e)
         // If there is any socket error (eg, socket timeout), the connection 
is no longer usable and needs to be recreated.
         networkClient.close(brokerNode.idString)
     }
@@ -230,13 +229,13 @@ class RequestSendThread(val controllerId: Int,
         if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time))
           throw new SocketTimeoutException(s"Failed to connect within 
$socketTimeoutMs ms")
 
-        info("Controller %d connected to %s for sending state change 
requests".format(controllerId, brokerNode.toString()))
+        info("Controller %d connected to %s for sending state change 
requests".format(controllerId, brokerNode.toString))
       }
 
       true
     } catch {
       case e: Throwable =>
-        warn("Controller %d's connection to broker %s was 
unsuccessful".format(controllerId, brokerNode.toString()), e)
+        warn("Controller %d's connection to broker %s was 
unsuccessful".format(controllerId, brokerNode.toString), e)
         networkClient.close(brokerNode.idString)
         false
     }
@@ -273,7 +272,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, 
partition: Int,
                                        leaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch,
-                                       replicas: Seq[Int], callback: 
AbstractRequestResponse => Unit = null) {
+                                       replicas: Seq[Int], callback: 
AbstractResponse => Unit = null) {
     val topicPartition = new TopicPartition(topic, partition)
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
@@ -286,13 +285,13 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
   }
 
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, 
partition: Int, deletePartition: Boolean,
-                                      callback: (AbstractRequestResponse, Int) 
=> Unit = null) {
+                                      callback: (AbstractResponse, Int) => 
Unit = null) {
     brokerIds.filter(b => b >= 0).foreach { brokerId =>
       stopReplicaRequestMap.getOrElseUpdate(brokerId, 
Seq.empty[StopReplicaRequestInfo])
       val v = stopReplicaRequestMap(brokerId)
       if(callback != null)
         stopReplicaRequestMap(brokerId) = v :+ 
StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
-          deletePartition, (r: AbstractRequestResponse) => callback(r, 
brokerId))
+          deletePartition, (r: AbstractResponse) => callback(r, brokerId))
       else
         stopReplicaRequestMap(brokerId) = v :+ 
StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
           deletePartition)
@@ -302,7 +301,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
   /** Send UpdateMetadataRequest to the given brokers for the given partitions 
and partitions that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
                                          partitions: 
collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
-                                         callback: AbstractRequestResponse => 
Unit = null) {
+                                         callback: AbstractResponse => Unit = 
null) {
     def updateMetadataRequestMapFor(partition: TopicAndPartition, 
beingDeleted: Boolean) {
       val leaderIsrAndControllerEpochOpt = 
controllerContext.partitionLeadershipInfo.get(partition)
       leaderIsrAndControllerEpochOpt match {
@@ -444,29 +443,29 @@ case class ControllerBrokerStateInfo(networkClient: 
NetworkClient,
                                      messageQueue: BlockingQueue[QueueItem],
                                      requestSendThread: RequestSendThread)
 
-case class StopReplicaRequestInfo(replica: PartitionAndReplica, 
deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null)
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, 
deletePartition: Boolean, callback: AbstractResponse => Unit = null)
 
-class Callbacks private (var leaderAndIsrResponseCallback: 
AbstractRequestResponse => Unit = null,
-                         var updateMetadataResponseCallback: 
AbstractRequestResponse => Unit = null,
-                         var stopReplicaResponseCallback: 
(AbstractRequestResponse, Int) => Unit = null)
+class Callbacks private (var leaderAndIsrResponseCallback: AbstractResponse => 
Unit = null,
+                         var updateMetadataResponseCallback: AbstractResponse 
=> Unit = null,
+                         var stopReplicaResponseCallback: (AbstractResponse, 
Int) => Unit = null)
 
 object Callbacks {
   class CallbackBuilder {
-    var leaderAndIsrResponseCbk: AbstractRequestResponse => Unit = null
-    var updateMetadataResponseCbk: AbstractRequestResponse => Unit = null
-    var stopReplicaResponseCbk: (AbstractRequestResponse, Int) => Unit = null
+    var leaderAndIsrResponseCbk: AbstractResponse => Unit = null
+    var updateMetadataResponseCbk: AbstractResponse => Unit = null
+    var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null
 
-    def leaderAndIsrCallback(cbk: AbstractRequestResponse => Unit): 
CallbackBuilder = {
+    def leaderAndIsrCallback(cbk: AbstractResponse => Unit): CallbackBuilder = 
{
       leaderAndIsrResponseCbk = cbk
       this
     }
 
-    def updateMetadataCallback(cbk: AbstractRequestResponse => Unit): 
CallbackBuilder = {
+    def updateMetadataCallback(cbk: AbstractResponse => Unit): CallbackBuilder 
= {
       updateMetadataResponseCbk = cbk
       this
     }
 
-    def stopReplicaCallback(cbk: (AbstractRequestResponse, Int) => Unit): 
CallbackBuilder = {
+    def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): 
CallbackBuilder = {
       stopReplicaResponseCbk = cbk
       this
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4a94aad..730f07c 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -20,7 +20,7 @@ import java.util
 
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, 
ControllerMovedException}
 import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{AbstractRequest, 
AbstractRequestResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
 
 import scala.collection._
 import com.yammer.metrics.core.{Gauge, Meter}
@@ -39,7 +39,7 @@ import kafka.utils.CoreUtils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, 
IZkStateListener, ZkClient, ZkConnection}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, 
IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
 import java.util.concurrent.locks.ReentrantLock
 
@@ -136,7 +136,7 @@ object KafkaController extends Logging {
       Json.parseFull(controllerInfoString) match {
         case Some(m) =>
           val controllerInfo = m.asInstanceOf[Map[String, Any]]
-          controllerInfo.get("brokerid").get.asInstanceOf[Int]
+          controllerInfo("brokerid").asInstanceOf[Int]
         case None => throw new KafkaException("Failed to parse the controller 
info json [%s].".format(controllerInfoString))
       }
     } catch {
@@ -686,7 +686,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
     onControllerResignation()
   }
 
-  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], 
request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) = {
+  def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], 
request: AbstractRequest, callback: AbstractResponse => Unit = null) = {
     controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, 
apiVersion, request, callback)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 8e5f3a1..823f9b4 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -19,13 +19,14 @@ package kafka.controller
 
 import kafka.server.ConfigType
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{StopReplicaResponse, 
AbstractRequestResponse}
+import org.apache.kafka.common.requests.{AbstractResponse, StopReplicaResponse}
 
 import collection.mutable
 import collection.JavaConverters._
-import kafka.utils.{ShutdownableThread, Logging}
+import kafka.utils.{Logging, ShutdownableThread}
 import kafka.utils.CoreUtils._
 import kafka.utils.ZkUtils._
+
 import collection.Set
 import kafka.common.TopicAndPartition
 import java.util.concurrent.locks.ReentrantLock
@@ -378,7 +379,7 @@ class TopicDeletionManager(controller: KafkaController,
     startReplicaDeletion(replicasPerPartition)
   }
 
-  private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: 
AbstractRequestResponse, replicaId: Int) {
+  private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: 
AbstractResponse, replicaId: Int) {
     val stopReplicaResponse = 
stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
     debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
     val responseMap = stopReplicaResponse.responses.asScala

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala 
b/core/src/main/scala/kafka/log/FileMessageSet.scala
index fd9ec5f..c33e376 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -26,9 +26,10 @@ import kafka.utils._
 import kafka.message._
 import kafka.common.KafkaException
 import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.network.TransportLayer
+import org.apache.kafka.common.record.FileRecords
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable.ArrayBuffer
@@ -48,7 +49,6 @@ class FileMessageSet private[kafka](@volatile var file: File,
                                     private[log] val start: Int,
                                     private[log] val end: Int,
                                     isSlice: Boolean) extends MessageSet {
-  import FileMessageSet._
   /* the size of the message set in bytes */
   private val _size =
     if(isSlice)
@@ -126,6 +126,8 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
                        })
   }
 
+  override def asRecords: FileRecords = new FileRecords(file, channel, start, 
end, isSlice)
+
   /**
    * Search forward for the file position of the last offset that is greater 
than or equal to the target offset
    * and return its physical position and the size of the message (including 
log overhead) at the returned offset. If
@@ -206,31 +208,6 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
   }
 
   /**
-   * Write some of this set to the given channel.
-   * @param destChannel The channel to write to.
-   * @param writePosition The position in the message set to begin writing 
from.
-   * @param size The maximum number of bytes to write
-   * @return The number of bytes actually written.
-   */
-  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: 
Int): Int = {
-    // Ensure that the underlying size has not changed.
-    val newSize = math.min(channel.size.toInt, end) - start
-    if (newSize < _size.get()) {
-      throw new KafkaException("Size of FileMessageSet %s has been truncated 
during write: old size %d, new size %d"
-        .format(file.getAbsolutePath, _size.get(), newSize))
-    }
-    val position = start + writePosition
-    val count = math.min(size, sizeInBytes)
-    val bytesTransferred = (destChannel match {
-      case tl: TransportLayer => tl.transferFrom(channel, position, count)
-      case dc => channel.transferTo(position, count, dc)
-    }).toInt
-    trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " 
+ bytesTransferred
-      + " bytes requested for transfer : " + math.min(size, sizeInBytes))
-    bytesTransferred
-  }
-
-  /**
     * This method is called before we write messages to the socket using 
zero-copy transfer. We need to
     * make sure all the messages in the message set have the expected magic 
value.
     *
@@ -337,7 +314,7 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
 
         // increment the location and return the item
         location += size + sizeOffsetLength
-        new MessageAndOffset(new Message(buffer), offset)
+        MessageAndOffset(new Message(buffer), offset)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index a33bc4b..096344d 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -26,7 +26,7 @@ import java.util.ArrayDeque
 
 import kafka.message.ByteBufferMessageSet.FilterResult
 import org.apache.kafka.common.errors.InvalidTimestampException
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
@@ -352,6 +352,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
 
   def getBuffer = buffer
 
+  override def asRecords: MemoryRecords = 
MemoryRecords.readableRecords(buffer.duplicate())
+
   private def shallowValidBytes: Int = {
     if (shallowValidByteCount < 0) {
       this.shallowValidByteCount = this.internalIterator(isShallow = true).map 
{ messageAndOffset =>
@@ -371,19 +373,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
     written
   }
 
-  /** Write the messages in this set to the given channel starting at the 
given offset byte.
-    * Less than the complete amount may be written, but no more than maxSize 
can be. The number
-    * of bytes written is returned */
-  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int 
= {
-    if (offset > Int.MaxValue)
-      throw new IllegalArgumentException(s"offset should not be larger than 
Int.MaxValue: $offset")
-    val dup = buffer.duplicate()
-    val position = offset.toInt
-    dup.position(position)
-    dup.limit(math.min(buffer.limit, position + maxSize))
-    channel.write(dup)
-  }
-
   override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): 
Boolean = {
     for (messageAndOffset <- shallowIterator) {
       if (messageAndOffset.message.magic != expectedMagicValue)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala 
b/core/src/main/scala/kafka/message/MessageSet.scala
index 14c455c..ffa27fa 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -20,6 +20,8 @@ package kafka.message
 import java.nio._
 import java.nio.channels._
 
+import org.apache.kafka.common.record.Records
+
 /**
  * Message set helper functions
  */
@@ -70,11 +72,6 @@ case class MagicAndTimestamp(magic: Byte, timestamp: Long)
  */
 abstract class MessageSet extends Iterable[MessageAndOffset] {
 
-  /** Write the messages in this set to the given channel starting at the 
given offset byte. 
-    * Less than the complete amount may be written, but no more than maxSize 
can be. The number
-    * of bytes written is returned */
-  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
-
   /**
    * Check if all the wrapper messages in the message set have the expected 
magic value
    */
@@ -91,6 +88,11 @@ abstract class MessageSet extends Iterable[MessageAndOffset] 
{
   def sizeInBytes: Int
 
   /**
+   * Get the client representation of the message set
+   */
+  def asRecords: Records
+
+  /**
    * Print this message set's contents. If the message set has more than 100 
messages, just
    * print the first 100.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index dace782..0cece68 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,7 +23,7 @@ import java.util.HashMap
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
-import kafka.api._
+import kafka.api.{ControlledShutdownRequest, RequestOrResponse}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaId
 import kafka.utils.{Logging, SystemTime}
@@ -31,19 +31,19 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, 
ProduceRequest, RequestHeader, RequestSend}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.log4j.Logger
 
-
 object RequestChannel extends Logging {
-  val AllDone = new Request(processor = 1, connectionId = "2", new 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost()), buffer = 
getShutdownReceive(), startTimeMs = 0, securityProtocol = 
SecurityProtocol.PLAINTEXT)
+  val AllDone = Request(processor = 1, connectionId = "2", 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), buffer = 
getShutdownReceive(), startTimeMs = 0, securityProtocol = 
SecurityProtocol.PLAINTEXT)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
   def getShutdownReceive() = {
     val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", 0)
-    val emptyProduceRequest = new ProduceRequest(0, 0, new 
HashMap[TopicPartition, ByteBuffer]())
-    RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct)
+    val emptyProduceRequest = new ProduceRequest(0, 0, new 
HashMap[TopicPartition, MemoryRecords]())
+    AbstractRequestResponse.serialize(emptyRequestHeader, emptyProduceRequest)
   }
 
   case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
@@ -61,19 +61,12 @@ object RequestChannel extends Logging {
 
     val requestId = buffer.getShort()
 
-    // TODO: this will be removed once we migrated to client-side format
-    // for server-side request / response format
-    // NOTE: this map only includes the server-side request/response handlers. 
Newer
-    // request types should only use the client-side versions which are parsed 
with
-    // o.a.k.common.requests.AbstractRequest.getRequest()
-    private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => 
RequestOrResponse]=
-      Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
-        ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> 
ControlledShutdownRequest.readFrom
-      )
-
-    // TODO: this will be removed once we migrated to client-side format
-    val requestObj =
-      keyToNameAndDeserializerMap.get(requestId).map(readFrom => 
readFrom(buffer)).orNull
+    // TODO: this will be removed once we remove support for v0 of 
ControlledShutdownRequest (which
+    // depends on a non-standard request header)
+    val requestObj: RequestOrResponse = if (requestId == 
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
+      ControlledShutdownRequest.readFrom(buffer)
+    else
+      null
 
     // if we failed to find a server-side mapping, then try using the
     // client-side request / response format
@@ -108,7 +101,7 @@ object RequestChannel extends Logging {
       if (requestObj != null)
         requestObj.describe(details)
       else
-        header.toString + " -- " + body.toString
+        s"$header -- $body"
     }
 
     trace("Processor %d received request : %s".format(processor, 
requestDesc(true)))
@@ -135,7 +128,7 @@ object RequestChannel extends Logging {
       val totalTime = endTimeMs - startTimeMs
       val fetchMetricNames =
         if (requestId == ApiKeys.FETCH.id) {
-          val isFromFollower = 
requestObj.asInstanceOf[FetchRequest].isFromFollower
+          val isFromFollower = body.asInstanceOf[FetchRequest].isFromFollower
           Seq(
             if (isFromFollower) RequestMetrics.followFetchMetricName
             else RequestMetrics.consumerFetchMetricName
@@ -172,6 +165,9 @@ object RequestChannel extends Logging {
 
     def this(request: Request, send: Send) =
       this(request.processor, request, send)
+
+    def this(request: Request, response: AbstractResponse) =
+      this(request, response.toSend(request.connectionId, request.header))
   }
 
   trait ResponseAction
@@ -221,14 +217,14 @@ class RequestChannel(val numProcessors: Int, val 
queueSize: Int) extends KafkaMe
 
   /** No operation to take for the request, need to read more over the network 
*/
   def noOperation(processor: Int, request: RequestChannel.Request) {
-    responseQueues(processor).put(new RequestChannel.Response(processor, 
request, null, RequestChannel.NoOpAction))
+    responseQueues(processor).put(RequestChannel.Response(processor, request, 
null, RequestChannel.NoOpAction))
     for(onResponse <- responseListeners)
       onResponse(processor)
   }
 
   /** Close the connection for the request */
   def closeConnection(processor: Int, request: RequestChannel.Request) {
-    responseQueues(processor).put(new RequestChannel.Response(processor, 
request, null, RequestChannel.CloseConnectionAction))
+    responseQueues(processor).put(RequestChannel.Response(processor, request, 
null, RequestChannel.CloseConnectionAction))
     for(onResponse <- responseListeners)
       onResponse(processor)
   }
@@ -254,7 +250,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: 
Int) extends KafkaMe
   }
 
   def shutdown() {
-    requestQueue.clear
+    requestQueue.clear()
   }
 }
 
@@ -283,4 +279,3 @@ class RequestMetrics(name: String) extends 
KafkaMetricsGroup {
   val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, 
tags)
   val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 4bf04e6..001051f 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -20,14 +20,15 @@ package kafka.server
 import java.util.concurrent.TimeUnit
 
 import kafka.api.FetchResponsePartitionData
-import kafka.api.PartitionFetchInfo
 import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{NotLeaderForPartitionException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
 
 import scala.collection._
 
-case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, 
fetchInfo: PartitionFetchInfo) {
+case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, 
fetchInfo: PartitionData) {
 
   override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", 
" +
                           "fetchInfo: " + fetchInfo + "]"
@@ -103,7 +104,7 @@ class DelayedFetch(delayMs: Long,
                   return forceComplete()
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
-                val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+                val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
                 if (quota.isThrottled(topicAndPartition))
                   accumulatedThrottledSize += bytesAvailable
                 else
@@ -146,7 +147,7 @@ class DelayedFetch(delayMs: Long,
       readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
       fetchMaxBytes = fetchMetadata.fetchMaxBytes,
       hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
-      readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, 
status) => tp -> status.fetchInfo },
+      readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, 
status) => new TopicPartition(tp.topic, tp.partition) -> status.fetchInfo },
       quota = quota
     )
 

Reply via email to