http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java new file mode 100644 index 0000000..f9f76be --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.*; + +public class UpdateMetadataRequest extends AbstractRequest { + + public static final class PartitionState { + public final int controllerEpoch; + public final int leader; + public final int leaderEpoch; + public final List<Integer> isr; + public final int zkVersion; + public final Set<Integer> replicas; + + public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) { + this.controllerEpoch = controllerEpoch; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.isr = isr; + this.zkVersion = zkVersion; + this.replicas = replicas; + } + + } + + public static final class Broker { + public final int id; + public final Map<SecurityProtocol, EndPoint> endPoints; + + public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) { + this.id = id; + this.endPoints = endPoints; + } + } + + public static final class EndPoint { + public final String host; + public final int port; + + public EndPoint(String host, int port) { + this.host = host; + this.port = port; + } + } + + @Deprecated + public static final class BrokerEndPoint { + public final int id; + public final String host; + public final int port; + + public BrokerEndPoint(int id, String host, int port) { + this.id = id; + this.host = host; + this.port = port; + } + } + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.UPDATE_METADATA_KEY.id); + + private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; + private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch"; + private static final String PARTITION_STATES_KEY_NAME = "partition_states"; + private static final String LIVE_BROKERS_KEY_NAME = "live_brokers"; + + // PartitionState key names + private static final String TOPIC_KEY_NAME = "topic"; + private static final String PARTITION_KEY_NAME = "partition"; + private static final String LEADER_KEY_NAME = "leader"; + private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch"; + private static final String ISR_KEY_NAME = "isr"; + private static final String ZK_VERSION_KEY_NAME = "zk_version"; + private static final String REPLICAS_KEY_NAME = "replicas"; + + // Broker key names + private static final String BROKER_ID_KEY_NAME = "id"; + private static final String ENDPOINTS_KEY_NAME = "end_points"; + + // EndPoint key names + private static final String HOST_KEY_NAME = "host"; + private static final String PORT_KEY_NAME = "port"; + private static final String SECURITY_PROTOCOL_TYPE_KEY_NAME = "security_protocol_type"; + + private final int controllerId; + private final int controllerEpoch; + private final Map<TopicPartition, PartitionState> partitionStates; + private final Set<Broker> liveBrokers; + + /** + * Constructor for version 0. + */ + @Deprecated + public UpdateMetadataRequest(int controllerId, int controllerEpoch, Set<BrokerEndPoint> liveBrokers, + Map<TopicPartition, PartitionState> partitionStates) { + this(0, controllerId, controllerEpoch, partitionStates, + brokerEndPointsToBrokers(liveBrokers)); + } + + private static Set<Broker> brokerEndPointsToBrokers(Set<BrokerEndPoint> brokerEndPoints) { + Set<Broker> brokers = new HashSet<>(brokerEndPoints.size()); + for (BrokerEndPoint brokerEndPoint : brokerEndPoints) { + Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT, + new EndPoint(brokerEndPoint.host, brokerEndPoint.port)); + brokers.add(new Broker(brokerEndPoint.id, endPoints)); + } + return brokers; + } + + /** + * Constructor for version 1. + */ + public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map<TopicPartition, + PartitionState> partitionStates, Set<Broker> liveBrokers) { + this(1, controllerId, controllerEpoch, partitionStates, liveBrokers); + } + + private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition, + PartitionState> partitionStates, Set<Broker> liveBrokers) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version))); + struct.set(CONTROLLER_ID_KEY_NAME, controllerId); + struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); + + List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size()); + for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) { + Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); + TopicPartition topicPartition = entry.getKey(); + partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); + partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); + PartitionState partitionState = entry.getValue(); + partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch); + partitionStateData.set(LEADER_KEY_NAME, partitionState.leader); + partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch); + partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); + partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); + partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); + partitionStatesData.add(partitionStateData); + } + struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); + + List<Struct> brokersData = new ArrayList<>(liveBrokers.size()); + for (Broker broker : liveBrokers) { + Struct brokerData = struct.instance(LIVE_BROKERS_KEY_NAME); + brokerData.set(BROKER_ID_KEY_NAME, broker.id); + + if (version == 0) { + EndPoint endPoint = broker.endPoints.get(SecurityProtocol.PLAINTEXT); + brokerData.set(HOST_KEY_NAME, endPoint.host); + brokerData.set(PORT_KEY_NAME, endPoint.port); + } else { + List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size()); + for (Map.Entry<SecurityProtocol, EndPoint> entry : broker.endPoints.entrySet()) { + Struct endPointData = brokerData.instance(ENDPOINTS_KEY_NAME); + endPointData.set(PORT_KEY_NAME, entry.getValue().port); + endPointData.set(HOST_KEY_NAME, entry.getValue().host); + endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, entry.getKey().id); + endPointsData.add(endPointData); + + } + brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray()); + } + + brokersData.add(brokerData); + } + struct.set(LIVE_BROKERS_KEY_NAME, brokersData.toArray()); + + this.controllerId = controllerId; + this.controllerEpoch = controllerEpoch; + this.partitionStates = partitionStates; + this.liveBrokers = liveBrokers; + } + + public UpdateMetadataRequest(Struct struct) { + super(struct); + + Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); + for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) { + Struct partitionStateData = (Struct) partitionStateDataObj; + String topic = partitionStateData.getString(TOPIC_KEY_NAME); + int partition = partitionStateData.getInt(PARTITION_KEY_NAME); + int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME); + int leader = partitionStateData.getInt(LEADER_KEY_NAME); + int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME); + + Object[] isrArray = partitionStateData.getArray(ISR_KEY_NAME); + List<Integer> isr = new ArrayList<>(isrArray.length); + for (Object r : isrArray) + isr.add((Integer) r); + + int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME); + + Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME); + Set<Integer> replicas = new HashSet<>(replicasArray.length); + for (Object r : replicasArray) + replicas.add((Integer) r); + + PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas); + partitionStates.put(new TopicPartition(topic, partition), partitionState); + + } + + Set<Broker> liveBrokers = new HashSet<>(); + + for (Object brokerDataObj : struct.getArray(LIVE_BROKERS_KEY_NAME)) { + Struct brokerData = (Struct) brokerDataObj; + int brokerId = brokerData.getInt(BROKER_ID_KEY_NAME); + + // V0 + if (brokerData.hasField(HOST_KEY_NAME)) { + String host = brokerData.getString(HOST_KEY_NAME); + int port = brokerData.getInt(PORT_KEY_NAME); + Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1); + endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port)); + liveBrokers.add(new Broker(brokerId, endPoints)); + } else { // V1 + Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(); + for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) { + Struct endPointData = (Struct) endPointDataObj; + int port = endPointData.getInt(PORT_KEY_NAME); + String host = endPointData.getString(HOST_KEY_NAME); + short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME); + endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port)); + } + liveBrokers.add(new Broker(brokerId, endPoints)); + } + + } + + controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME); + controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME); + this.partitionStates = partitionStates; + this.liveBrokers = liveBrokers; + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + case 1: + return new UpdateMetadataResponse(Errors.forException(e).code()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id))); + } + } + + public int controllerId() { + return controllerId; + } + + public int controllerEpoch() { + return controllerEpoch; + } + + public Map<TopicPartition, PartitionState> partitionStates() { + return partitionStates; + } + + public Set<Broker> liveBrokers() { + return liveBrokers; + } + + public static UpdateMetadataRequest parse(ByteBuffer buffer, int versionId) { + return new UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, versionId, buffer)); + } + + public static UpdateMetadataRequest parse(ByteBuffer buffer) { + return new UpdateMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java new file mode 100644 index 0000000..5bec437 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class UpdateMetadataResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.UPDATE_METADATA_KEY.id); + + private static final String ERROR_CODE_KEY_NAME = "error_code"; + + /** + * Possible error code: + * + * STALE_CONTROLLER_EPOCH (11) + */ + private final short errorCode; + + public UpdateMetadataResponse(short errorCode) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + this.errorCode = errorCode; + } + + public UpdateMetadataResponse(Struct struct) { + super(struct); + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + } + + public short errorCode() { + return errorCode; + } + + public static UpdateMetadataResponse parse(ByteBuffer buffer) { + return new UpdateMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } + + public static UpdateMetadataResponse parse(ByteBuffer buffer, int version) { + return new UpdateMetadataResponse(ProtoUtils.parseResponse(ApiKeys.UPDATE_METADATA_KEY.id, version, buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 9133d85..e5815f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -201,6 +201,11 @@ public class MockClient implements KafkaClient { } @Override + public RequestHeader nextRequestHeader(ApiKeys key, short version) { + return new RequestHeader(key.id, version, "mock", correlation++); + } + + @Override public void wakeup() { } @@ -209,6 +214,11 @@ public class MockClient implements KafkaClient { } @Override + public void close(String nodeId) { + ready.remove(Integer.valueOf(nodeId)); + } + + @Override public Node leastLoadedNode(long now) { return this.node; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 43238ce..ce6328a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -52,6 +52,8 @@ public class NetworkClientTest { private Cluster cluster = TestUtils.singletonCluster("test", nodeId); private Node node = cluster.nodes().get(0); private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024); + private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)), + "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024); @Before public void setup() { @@ -84,15 +86,24 @@ public class NetworkClientTest { @Test public void testSimpleRequestResponse() { + checkSimpleRequestResponse(client); + } + + @Test + public void testSimpleRequestResponseWithStaticNodes() { + checkSimpleRequestResponse(clientWithStaticNodes); + } + + private void checkSimpleRequestResponse(NetworkClient networkClient) { ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap()); - RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); + RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE); RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct()); TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler); - awaitReady(client, node); - client.send(request); - client.poll(1, time.milliseconds()); - assertEquals(1, client.inFlightRequestCount()); + awaitReady(networkClient, node); + networkClient.send(request); + networkClient.poll(1, time.milliseconds()); + assertEquals(1, networkClient.inFlightRequestCount()); ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId()); Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); resp.set("responses", new Object[0]); @@ -102,7 +113,7 @@ public class NetworkClientTest { resp.writeTo(buffer); buffer.flip(); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); - List<ClientResponse> responses = client.poll(1, time.milliseconds()); + List<ClientResponse> responses = networkClient.poll(1, time.milliseconds()); assertEquals(1, responses.size()); assertTrue("The handler should have executed.", handler.executed); assertTrue("Should have a response body.", handler.response.hasResponse()); http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 353d621..b668013 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -21,10 +21,12 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.junit.Test; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -44,6 +46,9 @@ public class RequestResponseTest { createConsumerMetadataRequest(), createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()), createConsumerMetadataResponse(), + createControlledShutdownRequest(), + createControlledShutdownResponse(), + createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()), createFetchRequest(), createFetchRequest().getErrorResponse(0, new UnknownServerException()), createFetchResponse(), @@ -70,18 +75,37 @@ public class RequestResponseTest { createProduceResponse(), createStopReplicaRequest(), createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()), - createStopReplicaResponse()); + createStopReplicaResponse(), + createUpdateMetadataRequest(1), + createUpdateMetadataRequest(1).getErrorResponse(1, new UnknownServerException()), + createUpdateMetadataResponse(), + createLeaderAndIsrRequest(), + createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()), + createLeaderAndIsrResponse() + ); + + for (AbstractRequestResponse req : requestResponseList) + checkSerialization(req, null); + + checkSerialization(createUpdateMetadataRequest(0), 0); + checkSerialization(createUpdateMetadataRequest(0).getErrorResponse(0, new UnknownServerException()), 0); + } - for (AbstractRequestResponse req: requestResponseList) { - ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf()); - req.writeTo(buffer); - buffer.rewind(); + private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception { + ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf()); + req.writeTo(buffer); + buffer.rewind(); + AbstractRequestResponse deserialized; + if (version == null) { Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class); - AbstractRequestResponse deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer); - assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized); - assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.", - req.hashCode(), deserialized.hashCode()); + deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer); + } else { + Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class, Integer.TYPE); + deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer, version); } + assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should be the same.", req, deserialized); + assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.", + req.hashCode(), deserialized.hashCode()); } @Test @@ -233,4 +257,81 @@ public class RequestResponseTest { responses.put(new TopicPartition("test", 0), Errors.NONE.code()); return new StopReplicaResponse(Errors.NONE.code(), responses); } + + private AbstractRequest createControlledShutdownRequest() { + return new ControlledShutdownRequest(10); + } + + private AbstractRequestResponse createControlledShutdownResponse() { + HashSet<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList( + new TopicPartition("test2", 5), + new TopicPartition("test1", 10) + )); + return new ControlledShutdownResponse(Errors.NONE.code(), topicPartitions); + } + + private AbstractRequest createLeaderAndIsrRequest() { + Map<TopicPartition, LeaderAndIsrRequest.PartitionState> partitionStates = new HashMap<>(); + List<Integer> isr = Arrays.asList(1, 2); + List<Integer> replicas = Arrays.asList(1, 2, 3, 4); + partitionStates.put(new TopicPartition("topic5", 105), + new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + partitionStates.put(new TopicPartition("topic5", 1), + new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + partitionStates.put(new TopicPartition("topic20", 1), + new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + + Set<LeaderAndIsrRequest.EndPoint> leaders = new HashSet<>(Arrays.asList( + new LeaderAndIsrRequest.EndPoint(0, "test0", 1223), + new LeaderAndIsrRequest.EndPoint(1, "test1", 1223) + )); + + return new LeaderAndIsrRequest(1, 10, partitionStates, leaders); + } + + private AbstractRequestResponse createLeaderAndIsrResponse() { + Map<TopicPartition, Short> responses = new HashMap<>(); + responses.put(new TopicPartition("test", 0), Errors.NONE.code()); + return new LeaderAndIsrResponse(Errors.NONE.code(), responses); + } + + private AbstractRequest createUpdateMetadataRequest(int version) { + Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>(); + List<Integer> isr = Arrays.asList(1, 2); + List<Integer> replicas = Arrays.asList(1, 2, 3, 4); + partitionStates.put(new TopicPartition("topic5", 105), + new UpdateMetadataRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + partitionStates.put(new TopicPartition("topic5", 1), + new UpdateMetadataRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + partitionStates.put(new TopicPartition("topic20", 1), + new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + + if (version == 0) { + Set<UpdateMetadataRequest.BrokerEndPoint> liveBrokers = new HashSet<>(Arrays.asList( + new UpdateMetadataRequest.BrokerEndPoint(0, "host1", 1223), + new UpdateMetadataRequest.BrokerEndPoint(1, "host2", 1234) + )); + + return new UpdateMetadataRequest(1, 10, liveBrokers, partitionStates); + } else { + Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints1 = new HashMap<>(); + endPoints1.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1223)); + + Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints2 = new HashMap<>(); + endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244)); + endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234)); + + Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1), + new UpdateMetadataRequest.Broker(1, endPoints2) + )); + + return new UpdateMetadataRequest(1, 10, partitionStates, liveBrokers); + } + } + + private AbstractRequestResponse createUpdateMetadataResponse() { + return new UpdateMetadataResponse(Errors.NONE.code()); + } + + } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/test/java/org/apache/kafka/test/MockSelector.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index 7257cad..f83fd9b 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -57,6 +57,10 @@ public class MockSelector implements Selectable { public void close() { } + @Override + public void close(String id) { + } + public void clear() { this.completedSends.clear(); this.completedReceives.clear(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 8092007..33c107f 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -20,39 +20,45 @@ package kafka.api import java.nio.ByteBuffer import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.api.ApiUtils._ import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging object ControlledShutdownRequest extends Logging { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue val DefaultClientId = "" def readFrom(buffer: ByteBuffer): ControlledShutdownRequest = { val versionId = buffer.getShort val correlationId = buffer.getInt + val clientId = if (versionId > 0) Some(readShortString(buffer)) else None val brokerId = buffer.getInt - new ControlledShutdownRequest(versionId, correlationId, brokerId) + new ControlledShutdownRequest(versionId, correlationId, clientId, brokerId) } + } case class ControlledShutdownRequest(versionId: Short, correlationId: Int, + clientId: Option[String], brokerId: Int) extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){ - def this(correlationId: Int, brokerId: Int) = - this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId) + if (versionId > 0 && clientId.isEmpty) + throw new IllegalArgumentException("`clientId` must be defined if `versionId` > 0") def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) + clientId.foreach(writeShortString(buffer, _)) buffer.putInt(brokerId) } - def sizeInBytes(): Int = { - 2 + /* version id */ + def sizeInBytes: Int = { + 2 + /* version id */ 4 + /* correlation id */ + clientId.fold(0)(shortStringLength) 4 /* broker id */ } @@ -70,6 +76,7 @@ case class ControlledShutdownRequest(versionId: Short, controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName) controlledShutdownRequest.append("; Version: " + versionId) controlledShutdownRequest.append("; CorrelationId: " + correlationId) + controlledShutdownRequest.append(";ClientId:" + clientId) controlledShutdownRequest.append("; BrokerId: " + brokerId) controlledShutdownRequest.toString() } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 4396b6e..da1cff0 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,20 +16,27 @@ */ package kafka.controller -import kafka.network.BlockingChannel -import kafka.utils.{CoreUtils, Logging, ShutdownableThread} -import org.apache.kafka.common.network.NetworkReceive +import kafka.api.{LeaderAndIsr, KAFKA_083, PartitionStateInfo} +import kafka.utils._ +import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.common.{TopicPartition, Node} +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.{Selectable, ChannelBuilders, Selector, NetworkReceive} +import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys} +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.security.ssl.SSLFactory +import org.apache.kafka.common.utils.Time import collection.mutable.HashMap import kafka.cluster.Broker +import java.net.{SocketTimeoutException} import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} import kafka.server.KafkaConfig import collection.mutable -import kafka.api._ -import kafka.common.TopicAndPartition -import kafka.api.RequestOrResponse +import kafka.common.{KafkaException, TopicAndPartition} import collection.Set +import collection.JavaConverters._ -class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { +class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics) extends Logging { protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " @@ -44,16 +51,16 @@ class ControllerChannelManager (private val controllerContext: ControllerContext def shutdown() = { brokerLock synchronized { - brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1)) + brokerStateInfo.values.foreach(removeExistingBroker) } } - def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) { + def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => - stateInfo.messageQueue.put((request, callback)) + stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback)) case None => warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) } @@ -72,30 +79,48 @@ class ControllerChannelManager (private val controllerContext: ControllerContext def removeBroker(brokerId: Int) { brokerLock synchronized { - removeExistingBroker(brokerId) + removeExistingBroker(brokerStateInfo(brokerId)) } } private def addNewBroker(broker: Broker) { - val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]() - debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id)) + val messageQueue = new LinkedBlockingQueue[QueueItem] + debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id)) val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) - val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port, - BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, - config.controllerSocketTimeoutMs) - val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, channel) + val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port) + val networkClient = controllerContext.networkClientMap.getOrElseUpdate(broker.id, { + val selector = new Selector( + NetworkReceive.UNLIMITED, + config.connectionsMaxIdleMs, + metrics, + time, + "controller-channel", + Map("broker-id" -> broker.id.toString).asJava, + false, + ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, config.channelConfigs) + ) + new NetworkClient( + selector, + new ManualMetadataUpdater(Seq(brokerNode).asJava), + config.brokerId.toString, + 1, + 0, + Selectable.USE_DEFAULT_BUFFER_SIZE, + Selectable.USE_DEFAULT_BUFFER_SIZE + ) + }) + val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, networkClient, brokerNode, config, time) requestThread.setDaemon(false) - brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread)) + brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, broker, messageQueue, requestThread)) } - private def removeExistingBroker(brokerId: Int) { + private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) { try { - brokerStateInfo(brokerId).channel.disconnect() - brokerStateInfo(brokerId).messageQueue.clear() - brokerStateInfo(brokerId).requestSendThread.shutdown() - brokerStateInfo.remove(brokerId) - }catch { + brokerState.networkClient.close(brokerState.brokerNode.idString) + brokerState.messageQueue.clear() + brokerState.requestSendThread.shutdown() + brokerStateInfo.remove(brokerState.broker.id) + } catch { case e: Throwable => error("Error while removing broker by the controller", e) } } @@ -107,21 +132,29 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } } +case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit) + class RequestSendThread(val controllerId: Int, val controllerContext: ControllerContext, val toBroker: Broker, - val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], - val channel: BlockingChannel) + val queue: BlockingQueue[QueueItem], + val networkClient: NetworkClient, + val brokerNode: Node, + val config: KafkaConfig, + val time: Time) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { + private val lock = new Object() private val stateChangeLogger = KafkaController.stateChangeLogger - connectToBroker(toBroker, channel) + private val socketTimeoutMs = config.controllerSocketTimeoutMs override def doWork(): Unit = { - val queueItem = queue.take() - val request = queueItem._1 - val callback = queueItem._2 - var receive: NetworkReceive = null + + def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300)) + + val QueueItem(apiKey, apiVersion, request, callback) = queue.take() + import NetworkClientBlockingOps._ + var clientResponse: ClientResponse = null try { lock synchronized { var isSendSuccessful = false @@ -129,30 +162,35 @@ class RequestSendThread(val controllerId: Int, // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { - channel.send(request) - receive = channel.receive() - isSendSuccessful = true + if (!brokerReady()) { + isSendSuccessful = false + backoff() + } + else { + val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _)) + val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct) + val clientRequest = new ClientRequest(time.milliseconds(), true, send, null) + clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)(time).getOrElse { + throw new SocketTimeoutException(s"No response received within $socketTimeoutMs ms") + } + isSendSuccessful = true + } } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message warn(("Controller %d epoch %d fails to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) - channel.disconnect() - connectToBroker(toBroker, channel) + networkClient.close(brokerNode.idString) isSendSuccessful = false - // backoff before retrying the connection and send - CoreUtils.swallowTrace(Thread.sleep(300)) + backoff() } } - if (receive != null) { - var response: RequestOrResponse = null - request.requestId.get match { - case RequestKeys.LeaderAndIsrKey => - response = LeaderAndIsrResponse.readFrom(receive.payload()) - case RequestKeys.StopReplicaKey => - response = StopReplicaResponse.readFrom(receive.payload()) - case RequestKeys.UpdateMetadataKey => - response = UpdateMetadataResponse.readFrom(receive.payload()) + if (clientResponse != null) { + val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match { + case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody) + case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody) + case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody) + case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey") } stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) @@ -165,70 +203,79 @@ class RequestSendThread(val controllerId: Int, } catch { case e: Throwable => error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) - // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. - channel.disconnect() + // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated. + networkClient.close(brokerNode.idString) } } - private def connectToBroker(broker: Broker, channel: BlockingChannel) { + private def brokerReady(): Boolean = { + import NetworkClientBlockingOps._ try { - channel.connect() - info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString())) - } catch { - case e: Throwable => { - channel.disconnect() - error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e) + + if (networkClient.isReady(brokerNode, time.milliseconds())) + true + else { + val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time) + + if (!ready) + throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") + + info("Controller %d connected to %s for sending state change requests".format(controllerId, toBroker.toString())) + true } + } catch { + case e: Throwable => + error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, toBroker.toString()), e) + networkClient.close(brokerNode.idString) + false } } + } class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging { val controllerContext = controller.controllerContext val controllerId: Int = controller.config.brokerId - val clientId: String = controller.clientId - val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] - val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]] - val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]] + val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]] + val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]] + val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]] private val stateChangeLogger = KafkaController.stateChangeLogger def newBatch() { // raise error if the previous batch is not empty - if(leaderAndIsrRequestMap.size > 0) + if (leaderAndIsrRequestMap.size > 0) throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " + "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString())) - if(stopReplicaRequestMap.size > 0) + if (stopReplicaRequestMap.size > 0) throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " + "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString())) - if(updateMetadataRequestMap.size > 0) + if (updateMetadataRequestMap.size > 0) throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " + "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString())) } def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) { - val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partition) - - brokerIds.filter(b => b >= 0).foreach { - brokerId => - leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) - leaderAndIsrRequestMap(brokerId).put((topic, partition), - PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) + replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) { + val topicPartition = new TopicPartition(topic, partition) + + brokerIds.filter(_ >= 0).foreach { brokerId => + val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) + result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) } addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, - Set(topicAndPartition)) + Set(TopicAndPartition(topic, partition))) } def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean, - callback: (RequestOrResponse, Int) => Unit = null) { + callback: (AbstractRequestResponse, Int) => Unit = null) { brokerIds.filter(b => b >= 0).foreach { brokerId => stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo]) val v = stopReplicaRequestMap(brokerId) if(callback != null) stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId), - deletePartition, (r: RequestOrResponse) => { callback(r, brokerId) }) + deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId)) else stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId), deletePartition) @@ -238,7 +285,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */ def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int], partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition], - callback: (RequestOrResponse) => Unit = null) { + callback: AbstractRequestResponse => Unit = null) { def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) { val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) leaderIsrAndControllerEpochOpt match { @@ -251,8 +298,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) } brokerIds.filter(b => b >= 0).foreach { brokerId => - updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) - updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo) + updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo]) + updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo) } case None => info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition)) @@ -269,9 +316,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging else givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted } - if(filteredPartitions.isEmpty) + if (filteredPartitions.isEmpty) brokerIds.filter(b => b >= 0).foreach { brokerId => - updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) + updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo]) } else filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false)) @@ -279,38 +326,71 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true)) } - def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { + def sendRequestsToBrokers(controllerEpoch: Int) { try { - leaderAndIsrRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) - for (p <- partitionStateInfos) { - val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + + leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) => + partitionStateInfos.foreach { case (topicPartition, state) => + val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, - p._1._1, p._1._2)) + state.leaderIsrAndControllerEpoch, broker, + topicPartition.topic, topicPartition.partition)) + } + val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { b => + val brokerEndPoint = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol) + new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + } + val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => + val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch + val partitionState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader, + leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, + partitionStateInfo.allReplicas.map(Integer.valueOf).asJava + ) + topicPartition -> partitionState } - controller.sendRequest(broker, leaderAndIsrRequest, null) + val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava) + controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null) } leaderAndIsrRequestMap.clear() - updateMetadataRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - - val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 - val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, - correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) - controller.sendRequest(broker, updateMetadataRequest, null) + updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) => + + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " + + "to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, + broker, p._1))) + val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => + val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch + val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader, + leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, + partitionStateInfo.allReplicas.map(Integer.valueOf).asJava + ) + topicPartition -> partitionState + } + + val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) (1: Short) else (0: Short) + + val updateMetadataRequest = + if (version == 0) { + val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker => + val brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT) + new UpdateMetadataRequest.BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + } + new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava) + } + else { + val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker => + val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) => + securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port) + } + new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava) + } + new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava) + } + + controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null) } updateMetadataRequestMap.clear() - stopReplicaRequestMap foreach { case(broker, replicaInfoList) => + stopReplicaRequestMap.foreach { case (broker, replicaInfoList) => val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet debug("The stop replica request (delete = true) sent to broker %d is %s" @@ -318,23 +398,23 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging debug("The stop replica request (delete = false) sent to broker %d is %s" .format(broker, stopReplicaWithoutDelete.mkString(","))) replicaInfoList.foreach { r => - val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, - Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) - controller.sendRequest(broker, stopReplicaRequest, r.callback) + val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition, + Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava) + controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback) } } stopReplicaRequestMap.clear() } catch { case e : Throwable => { - if(leaderAndIsrRequestMap.size > 0) { + if (leaderAndIsrRequestMap.size > 0) { error("Haven't been able to send leader and isr requests, current state of " + s"the map is $leaderAndIsrRequestMap") } - if(updateMetadataRequestMap.size > 0) { + if (updateMetadataRequestMap.size > 0) { error("Haven't been able to send metadata update requests, current state of " + s"the map is $updateMetadataRequestMap") } - if(stopReplicaRequestMap.size > 0) { + if (stopReplicaRequestMap.size > 0) { error("Haven't been able to send stop replica requests, current state of " + s"the map is $stopReplicaRequestMap") } @@ -344,34 +424,35 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } } -case class ControllerBrokerStateInfo(channel: BlockingChannel, +case class ControllerBrokerStateInfo(networkClient: NetworkClient, + brokerNode: Node, broker: Broker, - messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], + messageQueue: BlockingQueue[QueueItem], requestSendThread: RequestSendThread) -case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: (RequestOrResponse) => Unit = null) +case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null) -class Callbacks private (var leaderAndIsrResponseCallback:(RequestOrResponse) => Unit = null, - var updateMetadataResponseCallback:(RequestOrResponse) => Unit = null, - var stopReplicaResponseCallback:(RequestOrResponse, Int) => Unit = null) +class Callbacks private (var leaderAndIsrResponseCallback: AbstractRequestResponse => Unit = null, + var updateMetadataResponseCallback: AbstractRequestResponse => Unit = null, + var stopReplicaResponseCallback: (AbstractRequestResponse, Int) => Unit = null) object Callbacks { class CallbackBuilder { - var leaderAndIsrResponseCbk:(RequestOrResponse) => Unit = null - var updateMetadataResponseCbk:(RequestOrResponse) => Unit = null - var stopReplicaResponseCbk:(RequestOrResponse, Int) => Unit = null + var leaderAndIsrResponseCbk: AbstractRequestResponse => Unit = null + var updateMetadataResponseCbk: AbstractRequestResponse => Unit = null + var stopReplicaResponseCbk: (AbstractRequestResponse, Int) => Unit = null - def leaderAndIsrCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = { + def leaderAndIsrCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = { leaderAndIsrResponseCbk = cbk this } - def updateMetadataCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = { + def updateMetadataCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = { updateMetadataResponseCbk = cbk this } - def stopReplicaCallback(cbk: (RequestOrResponse, Int) => Unit): CallbackBuilder = { + def stopReplicaCallback(cbk: (AbstractRequestResponse, Int) => Unit): CallbackBuilder = { stopReplicaResponseCbk = cbk this } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4c37616..2d0845d 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -18,6 +18,10 @@ package kafka.controller import java.util +import org.apache.kafka.clients.NetworkClient +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse} + import scala.collection._ import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit @@ -32,9 +36,10 @@ import kafka.utils.ZkUtils._ import kafka.utils._ import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.Time import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} -import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock import kafka.server._ import kafka.common.TopicAndPartition @@ -47,12 +52,24 @@ class ControllerContext(val zkClient: ZkClient, val brokerShutdownLock: Object = new Object var epoch: Int = KafkaController.InitialControllerEpoch - 1 var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1 - val correlationId: AtomicInteger = new AtomicInteger(0) var allTopics: Set[String] = Set.empty var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty - var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap - var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet + val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap + val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet + + /** + * This map is used to ensure the following invariant: at most one `NetworkClient`/`Selector` instance should be + * created per broker during the lifetime of the `metrics` parameter received by `KafkaController` (which has the same + * lifetime as `KafkaController` since they are both shut down during `KafkaServer.shutdown()`). + * + * If we break this invariant, an exception is thrown during the instantiation of `Selector` due to the usage of + * two equal `MetricName` instances for two `Selector` instantiations. This way also helps to maintain the metrics sane. + * + * In the future, we should consider redesigning `ControllerChannelManager` so that we can use a single + * `NetworkClient`/`Selector` for multiple broker connections as that is the intended usage and it may help simplify this code. + */ + private[controller] val networkClientMap = mutable.Map[Int, NetworkClient]() private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty @@ -117,6 +134,12 @@ class ControllerContext(val zkClient: ZkClient, partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic } allTopics -= topic } + + private[controller] def closeNetworkClients(): Unit = { + networkClientMap.values.foreach(_.close()) + networkClientMap.clear() + } + } @@ -149,7 +172,7 @@ object KafkaController extends Logging { } } -class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState, time: Time, metrics: Metrics) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger @@ -267,7 +290,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt brokerRequestBatch.newBatch() brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + brokerRequestBatch.sendRequestsToBrokers(epoch) } catch { case e : IllegalStateException => { // Resign if the controller is in an illegal state @@ -688,10 +711,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt isRunning = false } onControllerResignation() + controllerContext.closeNetworkClients() } - def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { - controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback) + def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) = { + controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, apiVersion, request, callback) } def incrementControllerEpoch(zkClient: ZkClient) = { @@ -811,7 +835,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } private def startChannelManager() { - controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config) + controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics) controllerContext.controllerChannelManager.startup() } @@ -901,7 +925,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt try { brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) - brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) } catch { case e : IllegalStateException => { // Resign if the controller is in an illegal state @@ -1020,7 +1044,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt try { brokerRequestBatch.newBatch() brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + brokerRequestBatch.sendRequestsToBrokers(epoch) } catch { case e : IllegalStateException => { // Resign if the controller is in an illegal state http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 5b616f3..675a807 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -119,7 +119,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector, (new CallbackBuilder).build) } - brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch) } catch { case e: Throwable => error("Error while moving some partitions to the online state", e) // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions @@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitions.foreach { topicAndPartition => handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) } - brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch) }catch { case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 3a44fdc..acad83a 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -112,7 +112,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { try { brokerRequestBatch.newBatch() replicas.foreach(r => handleStateChange(r, targetState, callbacks)) - brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch) }catch { case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 64b11df..9e39dd5 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -18,13 +18,15 @@ package kafka.controller import kafka.server.ConfigType +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{StopReplicaResponse, AbstractRequestResponse} import collection.mutable +import collection.JavaConverters._ import kafka.utils.{ShutdownableThread, Logging, ZkUtils} import kafka.utils.CoreUtils._ import collection.Set -import kafka.common.{ErrorMapping, TopicAndPartition} -import kafka.api.{StopReplicaResponse, RequestOrResponse} +import kafka.common.TopicAndPartition import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicBoolean @@ -363,20 +365,20 @@ class TopicDeletionManager(controller: KafkaController, startReplicaDeletion(replicasPerPartition) } - private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: RequestOrResponse, replicaId: Int) { + private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) { val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse] debug("Delete topic callback invoked for %s".format(stopReplicaResponse)) - val partitionsInError = if(stopReplicaResponse.errorCode != ErrorMapping.NoError) { - stopReplicaResponse.responseMap.keySet - } else - stopReplicaResponse.responseMap.filter(p => p._2 != ErrorMapping.NoError).map(_._1).toSet + val responseMap = stopReplicaResponse.responses.asScala + val partitionsInError = + if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet + else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)) inLock(controllerContext.controllerLock) { // move all the failed replicas to ReplicaDeletionIneligible failReplicaDeletion(replicasInError) - if(replicasInError.size != stopReplicaResponse.responseMap.size) { + if (replicasInError.size != responseMap.size) { // some replicas could have been successfully deleted - val deletedReplicas = stopReplicaResponse.responseMap.keySet -- partitionsInError + val deletedReplicas = responseMap.keySet -- partitionsInError completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/network/BlockingChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 1197259..5408e0d 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -117,11 +117,17 @@ class BlockingChannel( val host: String, if(!connected) throw new ClosedChannelException() - val response = new NetworkReceive() - response.readCompletely(readChannel) + val response = readCompletely(readChannel) response.payload().rewind() response } + private def readCompletely(channel: ReadableByteChannel): NetworkReceive = { + val response = new NetworkReceive + while (!response.complete()) + response.readFromReadableChannel(channel) + response + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index af02c4e..d46603b 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -32,8 +32,7 @@ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{InvalidReceiveException, ChannelBuilder, - PlaintextChannelBuilder, SSLChannelBuilder} +import org.apache.kafka.common.network.{ChannelBuilders, InvalidReceiveException, ChannelBuilder, PlaintextChannelBuilder, SSLChannelBuilder} import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException @@ -368,7 +367,7 @@ private[kafka] class Processor(val id: Int, private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() private val inflightResponses = mutable.Map[String, RequestChannel.Response]() - private val channelBuilder = createChannelBuilder + private val channelBuilder = ChannelBuilders.create(protocol, SSLFactory.Mode.SERVER, channelConfigs) private val metricTags = new util.HashMap[String, String]() metricTags.put("networkProcessor", id.toString) @@ -514,14 +513,6 @@ private[kafka] class Processor(val id: Int, } } - private def createChannelBuilder(): ChannelBuilder = { - val channelBuilder: ChannelBuilder = if (protocol == SecurityProtocol.SSL) new SSLChannelBuilder(SSLFactory.Mode.SERVER) - else new PlaintextChannelBuilder() - - channelBuilder.configure(channelConfigs) - channelBuilder - } - /** * Close all open connections */
