http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index b7645dd..097b4fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -12,31 +12,19 @@ */ package org.apache.kafka.clients.producer.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.ProduceResponse; @@ -46,6 +34,17 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class SenderTest { private static final int MAX_REQUEST_SIZE = 1024 * 1024; @@ -69,7 +68,7 @@ public class SenderTest { @Before public void setup() { - Map<String, String> metricTags = new LinkedHashMap<String, String>(); + Map<String, String> metricTags = new LinkedHashMap<>(); metricTags.put("client-id", CLIENT_ID); MetricConfig metricConfig = new MetricConfig().tags(metricTags); metrics = new Metrics(metricConfig, time); @@ -147,7 +146,7 @@ public class SenderTest { Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request - String id = client.requests().peek().request().destination(); + String id = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); @@ -168,7 +167,7 @@ public class SenderTest { future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { - client.disconnect(client.requests().peek().request().destination()); + client.disconnect(client.requests().peek().destination()); sender.run(time.milliseconds()); // receive error sender.run(time.milliseconds()); // reconnect sender.run(time.milliseconds()); // resend @@ -205,8 +204,8 @@ public class SenderTest { accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, MAX_BLOCK_TIMEOUT); sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request - String id = client.requests().peek().request().destination(); - assertEquals(ApiKeys.PRODUCE.id, client.requests().peek().request().header().apiKey()); + String id = client.requests().peek().destination(); + assertEquals(ApiKeys.PRODUCE.id, client.requests().peek().header().apiKey()); Node node = new Node(Integer.valueOf(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); @@ -272,11 +271,10 @@ public class SenderTest { } } - private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) { + private ProduceResponse produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset, Record.NO_TIMESTAMP); Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp); - ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs); - return response.toStruct(); + return new ProduceResponse(partResp, throttleTimeMs); } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 30faac1..a5eb22a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -16,15 +16,20 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; import org.junit.Test; +import java.io.IOException; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -36,6 +41,7 @@ import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class RequestResponseTest { @@ -113,7 +119,6 @@ public class RequestResponseTest { for (AbstractRequestResponse req : requestResponseList) checkSerialization(req, null); - checkOlderFetchVersions(); checkSerialization(createMetadataResponse(0), 0); checkSerialization(createMetadataResponse(1), 1); @@ -180,7 +185,9 @@ public class RequestResponseTest { @Test public void fetchResponseVersionTest() { Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); + + MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10)); + responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records)); FetchResponse v0Response = new FetchResponse(responseData); FetchResponse v1Response = new FetchResponse(responseData, 10); @@ -193,6 +200,34 @@ public class RequestResponseTest { } @Test + public void verifyFetchResponseFullWrite() throws Exception { + FetchResponse fetchResponse = createFetchResponse(); + RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, "client", 15); + + Send send = fetchResponse.toSend("1", header); + ByteBufferChannel channel = new ByteBufferChannel(send.size()); + send.writeTo(channel); + channel.close(); + + ByteBuffer buf = channel.buf; + + // read the size + int size = buf.getInt(); + assertTrue(size > 0); + + // read the header + ResponseHeader responseHeader = ResponseHeader.parse(channel.buf); + assertEquals(header.correlationId(), responseHeader.correlationId()); + + // read the body + Struct responseBody = ProtoUtils.responseSchema(ApiKeys.FETCH.id, header.apiVersion()).read(buf); + FetchResponse parsedResponse = new FetchResponse(responseBody); + assertEquals(parsedResponse, fetchResponse); + + assertEquals(size, responseHeader.sizeOf() + parsedResponse.sizeOf()); + } + + @Test public void testControlledShutdownResponse() { ControlledShutdownResponse response = createControlledShutdownResponse(); ByteBuffer buffer = ByteBuffer.allocate(response.sizeOf()); @@ -216,24 +251,24 @@ public class RequestResponseTest { assertEquals("", deserialized.clientId()); // null is defaulted to "" } - private AbstractRequestResponse createRequestHeader() { + private RequestHeader createRequestHeader() { return new RequestHeader((short) 10, (short) 1, "", 10); } - private AbstractRequestResponse createResponseHeader() { + private ResponseHeader createResponseHeader() { return new ResponseHeader(10); } - private AbstractRequest createGroupCoordinatorRequest() { + private GroupCoordinatorRequest createGroupCoordinatorRequest() { return new GroupCoordinatorRequest("test-group"); } - private AbstractRequestResponse createGroupCoordinatorResponse() { + private GroupCoordinatorResponse createGroupCoordinatorResponse() { return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014)); } @SuppressWarnings("deprecation") - private AbstractRequest createFetchRequest(int version) { + private FetchRequest createFetchRequest(int version) { LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>(); fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000)); fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000)); @@ -243,22 +278,23 @@ public class RequestResponseTest { return new FetchRequest(100, 1000, 1000000, fetchData); } - private AbstractRequestResponse createFetchResponse() { + private FetchResponse createFetchResponse() { Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); - return new FetchResponse(responseData, 0); + MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10)); + responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records)); + return new FetchResponse(responseData, 25); } - private AbstractRequest createHeartBeatRequest() { + private HeartbeatRequest createHeartBeatRequest() { return new HeartbeatRequest("group1", 1, "consumer1"); } - private AbstractRequestResponse createHeartBeatResponse() { + private HeartbeatResponse createHeartBeatResponse() { return new HeartbeatResponse(Errors.NONE.code()); } @SuppressWarnings("deprecation") - private AbstractRequest createJoinGroupRequest(int version) { + private JoinGroupRequest createJoinGroupRequest(int version) { ByteBuffer metadata = ByteBuffer.wrap(new byte[] {}); List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>(); protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata)); @@ -269,27 +305,27 @@ public class RequestResponseTest { } } - private AbstractRequestResponse createJoinGroupResponse() { + private JoinGroupResponse createJoinGroupResponse() { Map<String, ByteBuffer> members = new HashMap<>(); members.put("consumer1", ByteBuffer.wrap(new byte[]{})); members.put("consumer2", ByteBuffer.wrap(new byte[]{})); return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members); } - private AbstractRequest createListGroupsRequest() { + private ListGroupsRequest createListGroupsRequest() { return new ListGroupsRequest(); } - private AbstractRequestResponse createListGroupsResponse() { + private ListGroupsResponse createListGroupsResponse() { List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer")); return new ListGroupsResponse(Errors.NONE.code(), groups); } - private AbstractRequest createDescribeGroupRequest() { + private DescribeGroupsRequest createDescribeGroupRequest() { return new DescribeGroupsRequest(Collections.singletonList("test-group")); } - private AbstractRequestResponse createDescribeGroupResponse() { + private DescribeGroupsResponse createDescribeGroupResponse() { String clientId = "consumer-1"; String clientHost = "localhost"; ByteBuffer empty = ByteBuffer.allocate(0); @@ -300,16 +336,16 @@ public class RequestResponseTest { return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata)); } - private AbstractRequest createLeaveGroupRequest() { + private LeaveGroupRequest createLeaveGroupRequest() { return new LeaveGroupRequest("group1", "consumer1"); } - private AbstractRequestResponse createLeaveGroupResponse() { + private LeaveGroupResponse createLeaveGroupResponse() { return new LeaveGroupResponse(Errors.NONE.code()); } @SuppressWarnings("deprecation") - private AbstractRequest createListOffsetRequest(int version) { + private ListOffsetRequest createListOffsetRequest(int version) { if (version == 0) { Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>(); offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); @@ -324,7 +360,7 @@ public class RequestResponseTest { } @SuppressWarnings("deprecation") - private AbstractRequestResponse createListOffsetResponse(int version) { + private ListOffsetResponse createListOffsetResponse(int version) { if (version == 0) { Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L))); @@ -338,11 +374,11 @@ public class RequestResponseTest { } } - private AbstractRequest createMetadataRequest(List<String> topics) { + private MetadataRequest createMetadataRequest(List<String> topics) { return new MetadataRequest(topics); } - private AbstractRequestResponse createMetadataResponse(int version) { + private MetadataResponse createMetadataResponse(int version) { Node node = new Node(1, "host1", 1001); List<Node> replicas = Arrays.asList(node); List<Node> isr = Arrays.asList(node); @@ -357,7 +393,7 @@ public class RequestResponseTest { } @SuppressWarnings("deprecation") - private AbstractRequest createOffsetCommitRequest(int version) { + private OffsetCommitRequest createOffsetCommitRequest(int version) { Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>(); commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, "")); commitData.put(new TopicPartition("test", 1), new OffsetCommitRequest.PartitionData(200, null)); @@ -371,47 +407,47 @@ public class RequestResponseTest { throw new IllegalArgumentException("Unknown offset commit request version " + version); } - private AbstractRequestResponse createOffsetCommitResponse() { + private OffsetCommitResponse createOffsetCommitResponse() { Map<TopicPartition, Short> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), Errors.NONE.code()); return new OffsetCommitResponse(responseData); } - private AbstractRequest createOffsetFetchRequest() { + private OffsetFetchRequest createOffsetFetchRequest() { return new OffsetFetchRequest("group1", Arrays.asList(new TopicPartition("test11", 1))); } - private AbstractRequestResponse createOffsetFetchResponse() { + private OffsetFetchResponse createOffsetFetchResponse() { Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code())); responseData.put(new TopicPartition("test", 1), new OffsetFetchResponse.PartitionData(100L, null, Errors.NONE.code())); return new OffsetFetchResponse(responseData); } - private AbstractRequest createProduceRequest() { - Map<TopicPartition, ByteBuffer> produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10)); + private ProduceRequest createProduceRequest() { + Map<TopicPartition, MemoryRecords> produceData = new HashMap<>(); + produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(ByteBuffer.allocate(10))); return new ProduceRequest((short) 1, 5000, produceData); } - private AbstractRequestResponse createProduceResponse() { + private ProduceResponse createProduceResponse() { Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP)); return new ProduceResponse(responseData, 0); } - private AbstractRequest createStopReplicaRequest(boolean deletePartitions) { + private StopReplicaRequest createStopReplicaRequest(boolean deletePartitions) { Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0))); return new StopReplicaRequest(0, 1, deletePartitions, partitions); } - private AbstractRequestResponse createStopReplicaResponse() { + private StopReplicaResponse createStopReplicaResponse() { Map<TopicPartition, Short> responses = new HashMap<>(); responses.put(new TopicPartition("test", 0), Errors.NONE.code()); return new StopReplicaResponse(Errors.NONE.code(), responses); } - private AbstractRequest createControlledShutdownRequest() { + private ControlledShutdownRequest createControlledShutdownRequest() { return new ControlledShutdownRequest(10); } @@ -423,7 +459,7 @@ public class RequestResponseTest { return new ControlledShutdownResponse(Errors.NONE.code(), topicPartitions); } - private AbstractRequest createLeaderAndIsrRequest() { + private LeaderAndIsrRequest createLeaderAndIsrRequest() { Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); List<Integer> isr = Arrays.asList(1, 2); List<Integer> replicas = Arrays.asList(1, 2, 3, 4); @@ -442,14 +478,14 @@ public class RequestResponseTest { return new LeaderAndIsrRequest(1, 10, partitionStates, leaders); } - private AbstractRequestResponse createLeaderAndIsrResponse() { + private LeaderAndIsrResponse createLeaderAndIsrResponse() { Map<TopicPartition, Short> responses = new HashMap<>(); responses.put(new TopicPartition("test", 0), Errors.NONE.code()); return new LeaderAndIsrResponse(Errors.NONE.code(), responses); } @SuppressWarnings("deprecation") - private AbstractRequest createUpdateMetadataRequest(int version, String rack) { + private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) { Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); List<Integer> isr = Arrays.asList(1, 2); List<Integer> replicas = Arrays.asList(1, 2, 3, 4); @@ -482,28 +518,28 @@ public class RequestResponseTest { } } - private AbstractRequestResponse createUpdateMetadataResponse() { + private UpdateMetadataResponse createUpdateMetadataResponse() { return new UpdateMetadataResponse(Errors.NONE.code()); } - private AbstractRequest createSaslHandshakeRequest() { + private SaslHandshakeRequest createSaslHandshakeRequest() { return new SaslHandshakeRequest("PLAIN"); } - private AbstractRequestResponse createSaslHandshakeResponse() { + private SaslHandshakeResponse createSaslHandshakeResponse() { return new SaslHandshakeResponse(Errors.NONE.code(), Collections.singletonList("GSSAPI")); } - private AbstractRequest createApiVersionRequest() { + private ApiVersionsRequest createApiVersionRequest() { return new ApiVersionsRequest(); } - private AbstractRequestResponse createApiVersionResponse() { + private ApiVersionsResponse createApiVersionResponse() { List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2)); return new ApiVersionsResponse(Errors.NONE.code(), apiVersions); } - private AbstractRequest createCreateTopicRequest() { + private CreateTopicsRequest createCreateTopicRequest() { CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5); Map<Integer, List<Integer>> replicaAssignments = new HashMap<>(); @@ -521,21 +557,65 @@ public class RequestResponseTest { return new CreateTopicsRequest(request, 0); } - private AbstractRequestResponse createCreateTopicResponse() { + private CreateTopicsResponse createCreateTopicResponse() { Map<String, Errors> errors = new HashMap<>(); errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION); errors.put("t2", Errors.LEADER_NOT_AVAILABLE); return new CreateTopicsResponse(errors); } - private AbstractRequest createDeleteTopicsRequest() { + private DeleteTopicsRequest createDeleteTopicsRequest() { return new DeleteTopicsRequest(new HashSet<>(Arrays.asList("my_t1", "my_t2")), 10000); } - private AbstractRequestResponse createDeleteTopicsResponse() { + private DeleteTopicsResponse createDeleteTopicsResponse() { Map<String, Errors> errors = new HashMap<>(); errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION); errors.put("t2", Errors.TOPIC_AUTHORIZATION_FAILED); return new DeleteTopicsResponse(errors); } + + private static class ByteBufferChannel implements GatheringByteChannel { + private final ByteBuffer buf; + private boolean closed = false; + + private ByteBufferChannel(long size) { + this.buf = ByteBuffer.allocate(Long.valueOf(size).intValue()); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + int position = buf.position(); + for (int i = 0; i < length; i++) { + ByteBuffer src = srcs[i].duplicate(); + if (i == 0) + src.position(offset); + buf.put(src); + } + return buf.position() - position; + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException { + return write(srcs, 0, srcs.length); + } + + @Override + public int write(ByteBuffer src) throws IOException { + int position = buf.position(); + buf.put(src); + return buf.position() - position; + } + + @Override + public boolean isOpen() { + return !closed; + } + + @Override + public void close() throws IOException { + buf.flip(); + closed = true; + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 97fe3d8..27c5695 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -12,18 +12,6 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; @@ -36,17 +24,17 @@ import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkTestUtils; import org.apache.kafka.common.network.NioEchoServer; import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Protocol; import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.AbstractRequestResponse; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.requests.SaslHandshakeRequest; import org.apache.kafka.common.requests.SaslHandshakeResponse; @@ -55,6 +43,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + /** * Tests for the Sasl authenticator. These use a test harness that runs a simple socket server that echos back responses. */ @@ -261,7 +261,8 @@ public class SaslAuthenticatorTest { String node = "1"; createClientConnection(SecurityProtocol.PLAINTEXT, node); RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE, "someclient", 1); - selector.send(new NetworkSend(node, RequestSend.serialize(header, new ApiVersionsRequest().toStruct()))); + ApiVersionsRequest request = new ApiVersionsRequest(); + selector.send(request.toSend(node, header)); ByteBuffer responseBuffer = waitForResponse(); ResponseHeader.parse(responseBuffer); ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer); @@ -291,8 +292,9 @@ public class SaslAuthenticatorTest { // Send ApiVersionsRequest and validate error response. String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); + SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN"); RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2); - selector.send(new NetworkSend(node1, RequestSend.serialize(header, new SaslHandshakeRequest("PLAIN").toStruct()))); + selector.send(request.toSend(node1, header)); NetworkTestUtils.waitForChannelClose(selector, node1); selector.close(); @@ -355,8 +357,9 @@ public class SaslAuthenticatorTest { createClientConnection(SecurityProtocol.PLAINTEXT, node1); sendHandshakeRequestReceiveResponse(node1); + ApiVersionsRequest request = new ApiVersionsRequest(); RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, "someclient", 2); - selector.send(new NetworkSend(node1, RequestSend.serialize(versionsHeader, new ApiVersionsRequest().toStruct()))); + selector.send(request.toSend(node1, versionsHeader)); NetworkTestUtils.waitForChannelClose(selector, node1); selector.close(); @@ -420,7 +423,7 @@ public class SaslAuthenticatorTest { createClientConnection(SecurityProtocol.PLAINTEXT, node1); RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 1); MetadataRequest metadataRequest1 = new MetadataRequest(Collections.singletonList("sometopic")); - selector.send(new NetworkSend(node1, RequestSend.serialize(metadataRequestHeader1, metadataRequest1.toStruct()))); + selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1)); NetworkTestUtils.waitForChannelClose(selector, node1); selector.close(); @@ -433,7 +436,7 @@ public class SaslAuthenticatorTest { sendHandshakeRequestReceiveResponse(node2); RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 2); MetadataRequest metadataRequest2 = new MetadataRequest(Collections.singletonList("sometopic")); - selector.send(new NetworkSend(node2, RequestSend.serialize(metadataRequestHeader2, metadataRequest2.toStruct()))); + selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2)); NetworkTestUtils.waitForChannelClose(selector, node2); selector.close(); @@ -581,25 +584,24 @@ public class SaslAuthenticatorTest { selector = null; } - private Struct sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequestResponse request) throws IOException { + private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException { RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1); - selector.send(new NetworkSend(node, RequestSend.serialize(header, request.toStruct()))); + Send send = request.toSend(node, header); + selector.send(send); ByteBuffer responseBuffer = waitForResponse(); return NetworkClient.parseResponse(responseBuffer, header); } private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception { SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN"); - Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest); - SaslHandshakeResponse response = new SaslHandshakeResponse(responseStruct); + SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest); assertEquals(Errors.NONE.code(), response.errorCode()); return response; } private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception { ApiVersionsRequest handshakeRequest = new ApiVersionsRequest(); - Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest); - ApiVersionsResponse response = new ApiVersionsResponse(responseStruct); + ApiVersionsResponse response = (ApiVersionsResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest); assertEquals(Errors.NONE.code(), response.errorCode()); return response; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 3bfa83f..3aff8f2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -16,7 +16,6 @@ **/ package org.apache.kafka.connect.runtime.distributed; -import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; @@ -24,7 +23,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.requests.JoinGroupResponse; @@ -172,7 +171,7 @@ public class WorkerCoordinatorTest { final String consumerId = "leader"; - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); // normal join group @@ -182,8 +181,8 @@ public class WorkerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code())); client.prepareResponse(new MockClient.RequestMatcher() { @Override - public boolean matches(ClientRequest request) { - SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + public boolean matches(AbstractRequest body) { + SyncGroupRequest sync = (SyncGroupRequest) body; return sync.memberId().equals(consumerId) && sync.generationId() == 1 && sync.groupAssignment().containsKey(consumerId); @@ -212,15 +211,15 @@ public class WorkerCoordinatorTest { final String memberId = "member"; - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); // normal join group client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); client.prepareResponse(new MockClient.RequestMatcher() { @Override - public boolean matches(ClientRequest request) { - SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + public boolean matches(AbstractRequest body) { + SyncGroupRequest sync = (SyncGroupRequest) body; return sync.memberId().equals(memberId) && sync.generationId() == 1 && sync.groupAssignment().isEmpty(); @@ -253,15 +252,15 @@ public class WorkerCoordinatorTest { final String memberId = "member"; - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); // config mismatch results in assignment error client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() { @Override - public boolean matches(ClientRequest request) { - SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + public boolean matches(AbstractRequest body) { + SyncGroupRequest sync = (SyncGroupRequest) body; return sync.memberId().equals(memberId) && sync.generationId() == 1 && sync.groupAssignment().isEmpty(); @@ -284,7 +283,7 @@ public class WorkerCoordinatorTest { PowerMock.replayAll(); - client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); // join the group once @@ -392,12 +391,11 @@ public class WorkerCoordinatorTest { } - private Struct groupMetadataResponse(Node node, short error) { - GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); - return response.toStruct(); + private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) { + return new GroupCoordinatorResponse(error, node); } - private Struct joinGroupLeaderResponse(int generationId, String memberId, + private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId, Map<String, Long> configOffsets, short error) { Map<String, ByteBuffer> metadata = new HashMap<>(); for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) { @@ -407,22 +405,21 @@ public class WorkerCoordinatorTest { ByteBuffer buf = ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(memberUrl, configOffset)); metadata.put(configStateEntry.getKey(), buf); } - return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct(); + return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata); } - private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) { + private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) { return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId, - Collections.<String, ByteBuffer>emptyMap()).toStruct(); + Collections.<String, ByteBuffer>emptyMap()); } - private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds, + private SyncGroupResponse syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds, List<ConnectorTaskId> taskIds, short error) { ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds); ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment); - return new SyncGroupResponse(error, buf).toStruct(); + return new SyncGroupResponse(error, buf); } - private static class MockRebalanceListener implements WorkerRebalanceListener { public ConnectProtocol.Assignment assignment = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 1179557..592fecf 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -42,7 +42,7 @@ class AdminClient(val time: Time, private def send(target: Node, api: ApiKeys, - request: AbstractRequest): Struct = { + request: AbstractRequest): AbstractResponse = { var future: RequestFuture[ClientResponse] = null future = client.send(target, api, request) @@ -54,9 +54,8 @@ class AdminClient(val time: Time, throw future.exception() } - private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = { - bootstrapBrokers.foreach { - case broker => + private def sendAnyNode(api: ApiKeys, request: AbstractRequest): AbstractResponse = { + bootstrapBrokers.foreach { broker => try { return send(broker, api, request) } catch { @@ -69,23 +68,20 @@ class AdminClient(val time: Time, private def findCoordinator(groupId: String): Node = { val request = new GroupCoordinatorRequest(groupId) - val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request) - val response = new GroupCoordinatorResponse(responseBody) + val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request).asInstanceOf[GroupCoordinatorResponse] Errors.forCode(response.errorCode()).maybeThrow() response.node() } def listGroups(node: Node): List[GroupOverview] = { - val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()) - val response = new ListGroupsResponse(responseBody) + val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()).asInstanceOf[ListGroupsResponse] Errors.forCode(response.errorCode()).maybeThrow() response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList } private def findAllBrokers(): List[Node] = { val request = new MetadataRequest(List[String]()) - val responseBody = sendAnyNode(ApiKeys.METADATA, request) - val response = new MetadataResponse(responseBody) + val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse] val errors = response.errors() if (!errors.isEmpty) debug(s"Metadata request contained errors: $errors") @@ -140,7 +136,7 @@ class AdminClient(val time: Time, def describeConsumerGroup(groupId: String): ConsumerGroupSummary = { val coordinator = findCoordinator(groupId) val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava)) - val response = new DescribeGroupsResponse(responseBody) + val response = responseBody.asInstanceOf[DescribeGroupsResponse] val metadata = response.groups.get(groupId) if (metadata == null) throw new KafkaException(s"Response from broker contained no metadata for group $groupId") http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 57e99c1..00897db 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -199,8 +199,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, _) => (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty)) } - val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId) + val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, request.header.apiVersion) // Magic value does not matter here because the message set is empty requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/FetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index d99bbcd..d31d4ba 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -76,7 +76,8 @@ class PartitionDataSend(val partitionId: Int, written += channel.write(buffer) if (!buffer.hasRemaining) { if (messagesSentSize < messageSize) { - val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize) + val records = partitionData.messages.asRecords + val bytesSent = records.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt messagesSentSize += bytesSent written += bytesSent } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala index cb5b95e..3783c29 100644 --- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala +++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala @@ -14,13 +14,14 @@ package kafka.api import java.nio.ByteBuffer -import org.apache.kafka.common.requests.AbstractRequestResponse + import kafka.api.ApiUtils._ +import org.apache.kafka.common.requests.AbstractResponse private[kafka] abstract class GenericRequestAndHeader(val versionId: Short, val correlationId: Int, val clientId: String, - val body: AbstractRequestResponse, + val body: AbstractResponse, val name: String, override val requestId: Option[Short] = None) extends RequestOrResponse(requestId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala index 2835fb6..be0c080 100644 --- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala +++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala @@ -14,10 +14,11 @@ package kafka.api import java.nio.ByteBuffer -import org.apache.kafka.common.requests.AbstractRequestResponse + +import org.apache.kafka.common.requests.AbstractResponse private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int, - val body: AbstractRequestResponse, + val body: AbstractResponse, val name: String, override val requestId: Option[Short] = None) extends RequestOrResponse(requestId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 3ca7bd7..f6e4475 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -128,8 +128,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, describe(true) } - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) { + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + if(request.body.asInstanceOf[org.apache.kafka.common.requests.ProduceRequest].acks == 0) { requestChannel.closeConnection(request.processor, request) } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/RequestOrResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index 65b37fd..d013047 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -42,6 +42,6 @@ abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Lo * This API has no meaning for a Response object. * @param details If this is false, omit the parts of the request description that are proportional to the number of * topics or partitions. This is mainly to control the amount of request logging. */ - def describe(details: Boolean):String + def describe(details: Boolean): String } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 03cd98c..1935ea2 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -42,7 +42,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " - controllerContext.liveBrokers.foreach(addNewBroker(_)) + controllerContext.liveBrokers.foreach(addNewBroker) def startup() = { brokerLock synchronized { @@ -56,7 +56,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf } } - def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) { + def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { @@ -149,7 +149,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf } } -case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit) +case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit) class RequestSendThread(val controllerId: Int, val controllerContext: ControllerContext, @@ -185,28 +185,27 @@ class RequestSendThread(val controllerId: Int, } else { val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _)) - val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct) - val clientRequest = new ClientRequest(time.milliseconds(), true, send, null) - clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time) + val clientRequest = new ClientRequest(brokerNode.idString, time.milliseconds(), true, requestHeader, request, null) + clientResponse = networkClient.blockingSendAndReceive(clientRequest, request)(time) isSendSuccessful = true } } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message warn(("Controller %d epoch %d fails to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, - request.toString, brokerNode.toString()), e) + request.toString, brokerNode.toString), e) networkClient.close(brokerNode.idString) isSendSuccessful = false backoff() } } if (clientResponse != null) { - val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match { - case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody) - case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody) - case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody) - case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey") - } + val api = ApiKeys.forId(clientResponse.requestHeader.apiKey) + if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY) + throw new KafkaException(s"Unexpected apiKey received: $apiKey") + + val response = clientResponse.responseBody + stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString)) @@ -217,7 +216,7 @@ class RequestSendThread(val controllerId: Int, } } catch { case e: Throwable => - error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e) + error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString), e) // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated. networkClient.close(brokerNode.idString) } @@ -230,13 +229,13 @@ class RequestSendThread(val controllerId: Int, if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)) throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") - info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString())) + info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString)) } true } catch { case e: Throwable => - warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString()), e) + warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString), e) networkClient.close(brokerNode.idString) false } @@ -273,7 +272,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) { + replicas: Seq[Int], callback: AbstractResponse => Unit = null) { val topicPartition = new TopicPartition(topic, partition) brokerIds.filter(_ >= 0).foreach { brokerId => @@ -286,13 +285,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean, - callback: (AbstractRequestResponse, Int) => Unit = null) { + callback: (AbstractResponse, Int) => Unit = null) { brokerIds.filter(b => b >= 0).foreach { brokerId => stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo]) val v = stopReplicaRequestMap(brokerId) if(callback != null) stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId), - deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId)) + deletePartition, (r: AbstractResponse) => callback(r, brokerId)) else stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId), deletePartition) @@ -302,7 +301,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */ def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int], partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition], - callback: AbstractRequestResponse => Unit = null) { + callback: AbstractResponse => Unit = null) { def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) { val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) leaderIsrAndControllerEpochOpt match { @@ -444,29 +443,29 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient, messageQueue: BlockingQueue[QueueItem], requestSendThread: RequestSendThread) -case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null) +case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null) -class Callbacks private (var leaderAndIsrResponseCallback: AbstractRequestResponse => Unit = null, - var updateMetadataResponseCallback: AbstractRequestResponse => Unit = null, - var stopReplicaResponseCallback: (AbstractRequestResponse, Int) => Unit = null) +class Callbacks private (var leaderAndIsrResponseCallback: AbstractResponse => Unit = null, + var updateMetadataResponseCallback: AbstractResponse => Unit = null, + var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = null) object Callbacks { class CallbackBuilder { - var leaderAndIsrResponseCbk: AbstractRequestResponse => Unit = null - var updateMetadataResponseCbk: AbstractRequestResponse => Unit = null - var stopReplicaResponseCbk: (AbstractRequestResponse, Int) => Unit = null + var leaderAndIsrResponseCbk: AbstractResponse => Unit = null + var updateMetadataResponseCbk: AbstractResponse => Unit = null + var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null - def leaderAndIsrCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = { + def leaderAndIsrCallback(cbk: AbstractResponse => Unit): CallbackBuilder = { leaderAndIsrResponseCbk = cbk this } - def updateMetadataCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = { + def updateMetadataCallback(cbk: AbstractResponse => Unit): CallbackBuilder = { updateMetadataResponseCbk = cbk this } - def stopReplicaCallback(cbk: (AbstractRequestResponse, Int) => Unit): CallbackBuilder = { + def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = { stopReplicaResponseCbk = cbk this } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4a94aad..730f07c 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -20,7 +20,7 @@ import java.util import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException} import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse} import scala.collection._ import com.yammer.metrics.core.{Gauge, Meter} @@ -39,7 +39,7 @@ import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time -import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} import java.util.concurrent.locks.ReentrantLock @@ -136,7 +136,7 @@ object KafkaController extends Logging { Json.parseFull(controllerInfoString) match { case Some(m) => val controllerInfo = m.asInstanceOf[Map[String, Any]] - controllerInfo.get("brokerid").get.asInstanceOf[Int] + controllerInfo("brokerid").asInstanceOf[Int] case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString)) } } catch { @@ -686,7 +686,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat onControllerResignation() } - def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) = { + def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit = null) = { controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, apiVersion, request, callback) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 8e5f3a1..823f9b4 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -19,13 +19,14 @@ package kafka.controller import kafka.server.ConfigType import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{StopReplicaResponse, AbstractRequestResponse} +import org.apache.kafka.common.requests.{AbstractResponse, StopReplicaResponse} import collection.mutable import collection.JavaConverters._ -import kafka.utils.{ShutdownableThread, Logging} +import kafka.utils.{Logging, ShutdownableThread} import kafka.utils.CoreUtils._ import kafka.utils.ZkUtils._ + import collection.Set import kafka.common.TopicAndPartition import java.util.concurrent.locks.ReentrantLock @@ -378,7 +379,7 @@ class TopicDeletionManager(controller: KafkaController, startReplicaDeletion(replicasPerPartition) } - private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) { + private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractResponse, replicaId: Int) { val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse] debug("Delete topic callback invoked for %s".format(stopReplicaResponse)) val responseMap = stopReplicaResponse.responses.asScala http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index fd9ec5f..c33e376 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -26,9 +26,10 @@ import kafka.utils._ import kafka.message._ import kafka.common.KafkaException import java.util.concurrent.TimeUnit -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} + +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import org.apache.kafka.common.errors.CorruptRecordException -import org.apache.kafka.common.network.TransportLayer +import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.utils.Utils import scala.collection.mutable.ArrayBuffer @@ -48,7 +49,6 @@ class FileMessageSet private[kafka](@volatile var file: File, private[log] val start: Int, private[log] val end: Int, isSlice: Boolean) extends MessageSet { - import FileMessageSet._ /* the size of the message set in bytes */ private val _size = if(isSlice) @@ -126,6 +126,8 @@ class FileMessageSet private[kafka](@volatile var file: File, }) } + override def asRecords: FileRecords = new FileRecords(file, channel, start, end, isSlice) + /** * Search forward for the file position of the last offset that is greater than or equal to the target offset * and return its physical position and the size of the message (including log overhead) at the returned offset. If @@ -206,31 +208,6 @@ class FileMessageSet private[kafka](@volatile var file: File, } /** - * Write some of this set to the given channel. - * @param destChannel The channel to write to. - * @param writePosition The position in the message set to begin writing from. - * @param size The maximum number of bytes to write - * @return The number of bytes actually written. - */ - def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = { - // Ensure that the underlying size has not changed. - val newSize = math.min(channel.size.toInt, end) - start - if (newSize < _size.get()) { - throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d" - .format(file.getAbsolutePath, _size.get(), newSize)) - } - val position = start + writePosition - val count = math.min(size, sizeInBytes) - val bytesTransferred = (destChannel match { - case tl: TransportLayer => tl.transferFrom(channel, position, count) - case dc => channel.transferTo(position, count, dc) - }).toInt - trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred - + " bytes requested for transfer : " + math.min(size, sizeInBytes)) - bytesTransferred - } - - /** * This method is called before we write messages to the socket using zero-copy transfer. We need to * make sure all the messages in the message set have the expected magic value. * @@ -337,7 +314,7 @@ class FileMessageSet private[kafka](@volatile var file: File, // increment the location and return the item location += size + sizeOffsetLength - new MessageAndOffset(new Message(buffer), offset) + MessageAndOffset(new Message(buffer), offset) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index a33bc4b..096344d 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -26,7 +26,7 @@ import java.util.ArrayDeque import kafka.message.ByteBufferMessageSet.FilterResult import org.apache.kafka.common.errors.InvalidTimestampException -import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.record.{MemoryRecords, TimestampType} import org.apache.kafka.common.utils.Utils import scala.collection.mutable @@ -352,6 +352,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi def getBuffer = buffer + override def asRecords: MemoryRecords = MemoryRecords.readableRecords(buffer.duplicate()) + private def shallowValidBytes: Int = { if (shallowValidByteCount < 0) { this.shallowValidByteCount = this.internalIterator(isShallow = true).map { messageAndOffset => @@ -371,19 +373,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi written } - /** Write the messages in this set to the given channel starting at the given offset byte. - * Less than the complete amount may be written, but no more than maxSize can be. The number - * of bytes written is returned */ - def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int = { - if (offset > Int.MaxValue) - throw new IllegalArgumentException(s"offset should not be larger than Int.MaxValue: $offset") - val dup = buffer.duplicate() - val position = offset.toInt - dup.position(position) - dup.limit(math.min(buffer.limit, position + maxSize)) - channel.write(dup) - } - override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { for (messageAndOffset <- shallowIterator) { if (messageAndOffset.message.magic != expectedMagicValue) http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/message/MessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index 14c455c..ffa27fa 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -20,6 +20,8 @@ package kafka.message import java.nio._ import java.nio.channels._ +import org.apache.kafka.common.record.Records + /** * Message set helper functions */ @@ -70,11 +72,6 @@ case class MagicAndTimestamp(magic: Byte, timestamp: Long) */ abstract class MessageSet extends Iterable[MessageAndOffset] { - /** Write the messages in this set to the given channel starting at the given offset byte. - * Less than the complete amount may be written, but no more than maxSize can be. The number - * of bytes written is returned */ - def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int - /** * Check if all the wrapper messages in the message set have the expected magic value */ @@ -91,6 +88,11 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { def sizeInBytes: Int /** + * Get the client representation of the message set + */ + def asRecords: Records + + /** * Print this message set's contents. If the message set has more than 100 messages, just * print the first 100. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index dace782..0cece68 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -23,7 +23,7 @@ import java.util.HashMap import java.util.concurrent._ import com.yammer.metrics.core.Gauge -import kafka.api._ +import kafka.api.{ControlledShutdownRequest, RequestOrResponse} import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaId import kafka.utils.{Logging, SystemTime} @@ -31,19 +31,19 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol} -import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, ProduceRequest, RequestHeader, RequestSend} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.log4j.Logger - object RequestChannel extends Logging { - val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost()), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) + val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) private val requestLogger = Logger.getLogger("kafka.request.logger") def getShutdownReceive() = { val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", 0) - val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, ByteBuffer]()) - RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct) + val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, MemoryRecords]()) + AbstractRequestResponse.serialize(emptyRequestHeader, emptyProduceRequest) } case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) { @@ -61,19 +61,12 @@ object RequestChannel extends Logging { val requestId = buffer.getShort() - // TODO: this will be removed once we migrated to client-side format - // for server-side request / response format - // NOTE: this map only includes the server-side request/response handlers. Newer - // request types should only use the client-side versions which are parsed with - // o.a.k.common.requests.AbstractRequest.getRequest() - private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]= - Map(ApiKeys.FETCH.id -> FetchRequest.readFrom, - ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom - ) - - // TODO: this will be removed once we migrated to client-side format - val requestObj = - keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull + // TODO: this will be removed once we remove support for v0 of ControlledShutdownRequest (which + // depends on a non-standard request header) + val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id) + ControlledShutdownRequest.readFrom(buffer) + else + null // if we failed to find a server-side mapping, then try using the // client-side request / response format @@ -108,7 +101,7 @@ object RequestChannel extends Logging { if (requestObj != null) requestObj.describe(details) else - header.toString + " -- " + body.toString + s"$header -- $body" } trace("Processor %d received request : %s".format(processor, requestDesc(true))) @@ -135,7 +128,7 @@ object RequestChannel extends Logging { val totalTime = endTimeMs - startTimeMs val fetchMetricNames = if (requestId == ApiKeys.FETCH.id) { - val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower + val isFromFollower = body.asInstanceOf[FetchRequest].isFromFollower Seq( if (isFromFollower) RequestMetrics.followFetchMetricName else RequestMetrics.consumerFetchMetricName @@ -172,6 +165,9 @@ object RequestChannel extends Logging { def this(request: Request, send: Send) = this(request.processor, request, send) + + def this(request: Request, response: AbstractResponse) = + this(request, response.toSend(request.connectionId, request.header)) } trait ResponseAction @@ -221,14 +217,14 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe /** No operation to take for the request, need to read more over the network */ def noOperation(processor: Int, request: RequestChannel.Request) { - responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction)) + responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction)) for(onResponse <- responseListeners) onResponse(processor) } /** Close the connection for the request */ def closeConnection(processor: Int, request: RequestChannel.Request) { - responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction)) + responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction)) for(onResponse <- responseListeners) onResponse(processor) } @@ -254,7 +250,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } def shutdown() { - requestQueue.clear + requestQueue.clear() } } @@ -283,4 +279,3 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags) val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags) } - http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 4bf04e6..001051f 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -20,14 +20,15 @@ package kafka.server import java.util.concurrent.TimeUnit import kafka.api.FetchResponsePartitionData -import kafka.api.PartitionFetchInfo import kafka.common.TopicAndPartition import kafka.metrics.KafkaMetricsGroup +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.requests.FetchRequest.PartitionData import scala.collection._ -case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) { +case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) { override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " + "fetchInfo: " + fetchInfo + "]" @@ -103,7 +104,7 @@ class DelayedFetch(delayMs: Long, return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) - val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize) + val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) if (quota.isThrottled(topicAndPartition)) accumulatedThrottledSize += bytesAvailable else @@ -146,7 +147,7 @@ class DelayedFetch(delayMs: Long, readOnlyCommitted = fetchMetadata.fetchOnlyCommitted, fetchMaxBytes = fetchMetadata.fetchMaxBytes, hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit, - readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, + readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => new TopicPartition(tp.topic, tp.partition) -> status.fetchInfo }, quota = quota )