KAFKA-4565; Separation of Internal and External traffic (KIP-103) Author: Ismael Juma <ism...@juma.me.uk>
Reviewers: Gwen Shapira <csh...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #2354 from ijuma/kafka-4565-separation-of-internal-and-external-traffic Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2567188 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2567188 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2567188 Branch: refs/heads/trunk Commit: d25671884bbbdf7843ada3e7797573a00ac7cd56 Parents: 80d6c64 Author: Ismael Juma <ism...@juma.me.uk> Authored: Fri Jan 13 10:00:06 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Jan 13 10:00:06 2017 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/clients/ClientUtils.java | 2 +- .../kafka/common/network/ListenerName.java | 68 ++++++++++ .../apache/kafka/common/protocol/Protocol.java | 26 +++- .../common/requests/UpdateMetadataRequest.java | 61 +++++---- .../common/requests/RequestResponseTest.java | 32 +++-- config/server.properties | 5 +- .../src/main/scala/kafka/admin/AdminUtils.scala | 32 +++-- .../kafka/admin/ConsumerGroupCommand.scala | 3 +- core/src/main/scala/kafka/api/ApiVersion.scala | 12 +- .../main/scala/kafka/client/ClientUtils.scala | 67 ++++++---- core/src/main/scala/kafka/cluster/Broker.scala | 106 +++++++++++---- .../src/main/scala/kafka/cluster/EndPoint.scala | 59 ++++---- .../kafka/consumer/ConsumerFetcherManager.scala | 2 +- .../controller/ControllerChannelManager.scala | 32 +++-- .../kafka/controller/KafkaController.scala | 12 +- .../scala/kafka/network/RequestChannel.scala | 24 ++-- .../main/scala/kafka/network/SocketServer.scala | 35 +++-- .../src/main/scala/kafka/server/KafkaApis.scala | 15 ++- .../main/scala/kafka/server/KafkaConfig.scala | 133 ++++++++++++------- .../scala/kafka/server/KafkaHealthcheck.scala | 16 ++- .../main/scala/kafka/server/KafkaServer.scala | 15 ++- .../main/scala/kafka/server/MetadataCache.scala | 43 +++--- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/tools/ConsumerOffsetChecker.scala | 3 +- .../scala/kafka/tools/UpdateOffsetsInZK.scala | 14 +- core/src/main/scala/kafka/utils/CoreUtils.scala | 24 +++- core/src/main/scala/kafka/utils/ZkUtils.scala | 42 ++---- .../kafka/api/AuthorizerIntegrationTest.scala | 7 +- .../kafka/api/ProducerBounceTest.scala | 2 +- .../kafka/api/ProducerCompressionTest.scala | 2 +- ...eListenersWithSameSecurityProtocolTest.scala | 124 +++++++++++++++++ .../unit/kafka/admin/AddPartitionsTest.scala | 27 ++-- .../admin/ReassignPartitionsCommandTest.scala | 6 +- .../api/RequestResponseSerializationTest.scala | 24 ++-- .../unit/kafka/cluster/BrokerEndPointTest.scala | 58 ++++---- .../integration/BaseTopicMetadataTest.scala | 41 +++--- .../unit/kafka/integration/FetcherTest.scala | 4 +- .../integration/KafkaServerTestHarness.scala | 7 + .../ProducerConsumerTestHarness.scala | 2 +- .../unit/kafka/network/SocketServerTest.scala | 12 +- .../unit/kafka/producer/AsyncProducerTest.scala | 3 +- .../unit/kafka/producer/ProducerTest.scala | 4 +- .../unit/kafka/producer/SyncProducerTest.scala | 18 +-- .../unit/kafka/server/AdvertiseBrokerTest.scala | 78 ++++++++--- .../unit/kafka/server/BaseRequestTest.scala | 3 +- .../ControlledShutdownLeaderSelectorTest.scala | 2 +- .../unit/kafka/server/EdgeCaseRequestTest.scala | 3 +- .../unit/kafka/server/KafkaConfigTest.scala | 131 +++++++++++++++--- .../unit/kafka/server/LeaderElectionTest.scala | 8 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 2 +- .../unit/kafka/server/MetadataCacheTest.scala | 50 ++++--- .../unit/kafka/server/OffsetCommitTest.scala | 2 +- .../unit/kafka/server/ReplicaManagerTest.scala | 6 +- .../unit/kafka/server/ServerShutdownTest.scala | 2 +- .../unit/kafka/server/ServerStartupTest.scala | 2 +- .../server/SessionExpireListenerTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 27 +++- .../integration/utils/KafkaEmbedded.java | 3 +- 58 files changed, 1049 insertions(+), 498 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index a0f5fab..3d54515 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -35,7 +35,7 @@ public class ClientUtils { private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) { - List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>(); + List<InetSocketAddress> addresses = new ArrayList<>(); for (String url : urls) { if (url != null && !url.isEmpty()) { try { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java new file mode 100644 index 0000000..b376514 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.protocol.SecurityProtocol; + +import java.util.Locale; +import java.util.Objects; + +public final class ListenerName { + + /** + * Create an instance with the security protocol name as the value. + */ + public static ListenerName forSecurityProtocol(SecurityProtocol securityProtocol) { + return new ListenerName(securityProtocol.name); + } + + /** + * Create an instance with the provided value converted to uppercase. + */ + public static ListenerName normalised(String value) { + return new ListenerName(value.toUpperCase(Locale.ROOT)); + } + + private final String value; + + public ListenerName(String value) { + Objects.requireNonNull("value should not be null"); + this.value = value; + } + + public String value() { + return value; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ListenerName)) + return false; + ListenerName that = (ListenerName) o; + return value.equals(that.value); + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public String toString() { + return "ListenerName(" + value + ")"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 6c94f6f..4946960 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -835,9 +835,31 @@ public class Protocol { public static final Schema UPDATE_METADATA_RESPONSE_V2 = UPDATE_METADATA_RESPONSE_V1; + public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2; - public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2}; - public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2}; + public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V3 = + new Schema(new Field("port", INT32, "The port on which the broker accepts requests."), + new Field("host", STRING, "The hostname of the broker."), + new Field("listener_name", STRING, "The listener name."), + new Field("security_protocol_type", INT16, "The security protocol type.")); + + public static final Schema UPDATE_METADATA_REQUEST_BROKER_V3 = + new Schema(new Field("id", INT32, "The broker id."), + new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V3)), + new Field("rack", NULLABLE_STRING, "The rack")); + + public static final Schema UPDATE_METADATA_REQUEST_V3 = + new Schema(new Field("controller_id", INT32, "The controller id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V3)), + new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3))); + + public static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2; + + public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, + UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3}; + public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, + UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3}; /* SASL handshake api */ public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema( http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 5fd682c..95e5683 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; @@ -52,10 +53,8 @@ public class UpdateMetadataRequest extends AbstractRequest { short version = version(); if (version == 0) { for (Broker broker : liveBrokers) { - if ((broker.endPoints.get(SecurityProtocol.PLAINTEXT) == null) - || (broker.endPoints.size() != 1)) { - throw new UnsupportedVersionException("UpdateMetadataRequest v0 only " + - "handles PLAINTEXT endpoints"); + if (broker.endPoints.size() != 1 || broker.endPoints.get(0).securityProtocol != SecurityProtocol.PLAINTEXT) { + throw new UnsupportedVersionException("UpdateMetadataRequest v0 only handles PLAINTEXT endpoints"); } } } @@ -77,25 +76,20 @@ public class UpdateMetadataRequest extends AbstractRequest { public static final class Broker { public final int id; - public final Map<SecurityProtocol, EndPoint> endPoints; - public final String rack; + public final List<EndPoint> endPoints; + public final String rack; // introduced in V2 - public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints, String rack) { + public Broker(int id, List<EndPoint> endPoints, String rack) { this.id = id; this.endPoints = endPoints; this.rack = rack; } - @Deprecated - public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) { - this(id, endPoints, null); - } - @Override public String toString() { StringBuilder bld = new StringBuilder(); bld.append("(id=").append(id); - bld.append(", endPoints=").append(Utils.mkString(endPoints)); + bld.append(", endPoints=").append(Utils.join(endPoints, ",")); bld.append(", rack=").append(rack); bld.append(")"); return bld.toString(); @@ -105,15 +99,20 @@ public class UpdateMetadataRequest extends AbstractRequest { public static final class EndPoint { public final String host; public final int port; + public final SecurityProtocol securityProtocol; + public final ListenerName listenerName; // introduced in V3 - public EndPoint(String host, int port) { + public EndPoint(String host, int port, SecurityProtocol securityProtocol, ListenerName listenerName) { this.host = host; this.port = port; + this.securityProtocol = securityProtocol; + this.listenerName = listenerName; } @Override public String toString() { - return "(host=" + host + ", port=" + port + ")"; + return "(host=" + host + ", port=" + port + ", listenerName=" + listenerName + + ", securityProtocol=" + securityProtocol + ")"; } } @@ -139,6 +138,7 @@ public class UpdateMetadataRequest extends AbstractRequest { // EndPoint key names private static final String HOST_KEY_NAME = "host"; private static final String PORT_KEY_NAME = "port"; + private static final String LISTENER_NAME_KEY_NAME = "listener_name"; private static final String SECURITY_PROTOCOL_TYPE_KEY_NAME = "security_protocol_type"; private final int controllerId; @@ -175,16 +175,18 @@ public class UpdateMetadataRequest extends AbstractRequest { brokerData.set(BROKER_ID_KEY_NAME, broker.id); if (version == 0) { - EndPoint endPoint = broker.endPoints.get(SecurityProtocol.PLAINTEXT); + EndPoint endPoint = broker.endPoints.get(0); brokerData.set(HOST_KEY_NAME, endPoint.host); brokerData.set(PORT_KEY_NAME, endPoint.port); } else { List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size()); - for (Map.Entry<SecurityProtocol, EndPoint> entry : broker.endPoints.entrySet()) { + for (EndPoint endPoint : broker.endPoints) { Struct endPointData = brokerData.instance(ENDPOINTS_KEY_NAME); - endPointData.set(PORT_KEY_NAME, entry.getValue().port); - endPointData.set(HOST_KEY_NAME, entry.getValue().host); - endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, entry.getKey().id); + endPointData.set(PORT_KEY_NAME, endPoint.port); + endPointData.set(HOST_KEY_NAME, endPoint.host); + endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, endPoint.securityProtocol.id); + if (version >= 3) + endPointData.set(LISTENER_NAME_KEY_NAME, endPoint.listenerName.value()); endPointsData.add(endPointData); } @@ -242,17 +244,24 @@ public class UpdateMetadataRequest extends AbstractRequest { if (brokerData.hasField(HOST_KEY_NAME)) { String host = brokerData.getString(HOST_KEY_NAME); int port = brokerData.getInt(PORT_KEY_NAME); - Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1); - endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port)); + List<EndPoint> endPoints = new ArrayList<>(1); + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + endPoints.add(new EndPoint(host, port, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))); liveBrokers.add(new Broker(brokerId, endPoints, null)); - } else { // V1 or V2 - Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(); + } else { // V1, V2 or V3 + List<EndPoint> endPoints = new ArrayList<>(); for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) { Struct endPointData = (Struct) endPointDataObj; int port = endPointData.getInt(PORT_KEY_NAME); String host = endPointData.getString(HOST_KEY_NAME); short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME); - endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port)); + SecurityProtocol securityProtocol = SecurityProtocol.forId(protocolTypeId); + String listenerName; + if (endPointData.hasField(LISTENER_NAME_KEY_NAME)) // V3 + listenerName = endPointData.getString(LISTENER_NAME_KEY_NAME); + else + listenerName = securityProtocol.name; + endPoints.add(new EndPoint(host, port, securityProtocol, new ListenerName(listenerName))); } String rack = null; if (brokerData.hasField(RACK_KEY_NAME)) { // V2 @@ -270,7 +279,7 @@ public class UpdateMetadataRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(Throwable e) { short versionId = version(); - if (versionId <= 2) + if (versionId <= 3) return new UpdateMetadataResponse(Errors.forException(e).code()); else throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index f02133d..de676c7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -16,6 +16,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -99,10 +100,6 @@ public class RequestResponseTest { checkSerialization(createStopReplicaRequest(false)); checkSerialization(createStopReplicaRequest(true).getErrorResponse(new UnknownServerException()), null); checkSerialization(createStopReplicaResponse(), null); - checkSerialization(createUpdateMetadataRequest(2, "rack1")); - checkSerialization(createUpdateMetadataRequest(2, null)); - checkSerialization(createUpdateMetadataRequest(2, "rack1").getErrorResponse(new UnknownServerException()), null); - checkSerialization(createUpdateMetadataResponse(), null); checkSerialization(createLeaderAndIsrRequest()); checkSerialization(createLeaderAndIsrRequest().getErrorResponse(new UnknownServerException()), null); checkSerialization(createLeaderAndIsrResponse(), null); @@ -132,6 +129,13 @@ public class RequestResponseTest { checkSerialization(createUpdateMetadataRequest(1, null), 1); checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1); checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(new UnknownServerException()), 1); + checkSerialization(createUpdateMetadataRequest(2, "rack1")); + checkSerialization(createUpdateMetadataRequest(2, null)); + checkSerialization(createUpdateMetadataRequest(2, "rack1").getErrorResponse(new UnknownServerException()), null); + checkSerialization(createUpdateMetadataRequest(3, "rack1")); + checkSerialization(createUpdateMetadataRequest(3, null)); + checkSerialization(createUpdateMetadataRequest(3, "rack1").getErrorResponse(new UnknownServerException()), null); + checkSerialization(createUpdateMetadataResponse(), null); checkSerialization(createListOffsetRequest(0), 0); checkSerialization(createListOffsetRequest(0).getErrorResponse(new UnknownServerException()), 0); checkSerialization(createListOffsetResponse(0), 0); @@ -498,16 +502,24 @@ public class RequestResponseTest { partitionStates.put(new TopicPartition("topic20", 1), new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); - Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints1 = new HashMap<>(); - endPoints1.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1223)); + SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT; + List<UpdateMetadataRequest.EndPoint> endPoints1 = new ArrayList<>(); + endPoints1.add(new UpdateMetadataRequest.EndPoint("host1", 1223, plaintext, + ListenerName.forSecurityProtocol(plaintext))); - Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints2 = new HashMap<>(); - endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244)); + List<UpdateMetadataRequest.EndPoint> endPoints2 = new ArrayList<>(); + endPoints2.add(new UpdateMetadataRequest.EndPoint("host1", 1244, plaintext, + ListenerName.forSecurityProtocol(plaintext))); if (version > 0) { - endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234)); + SecurityProtocol ssl = SecurityProtocol.SSL; + endPoints2.add(new UpdateMetadataRequest.EndPoint("host2", 1234, ssl, + ListenerName.forSecurityProtocol(ssl))); + endPoints2.add(new UpdateMetadataRequest.EndPoint("host2", 1334, ssl, + new ListenerName("CLIENT"))); } - Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack), + Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList( + new UpdateMetadataRequest.Broker(0, endPoints1, rack), new UpdateMetadataRequest.Broker(1, endPoints2, rack) )); return new UpdateMetadataRequest.Builder(1, 10, partitionStates, liveBrokers). http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/config/server.properties ---------------------------------------------------------------------- diff --git a/config/server.properties b/config/server.properties index 60479b2..506d0e7 100644 --- a/config/server.properties +++ b/config/server.properties @@ -28,7 +28,7 @@ broker.id=0 # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: -# listeners = security_protocol://host_name:port +# listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 @@ -38,6 +38,9 @@ broker.id=0 # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + # The number of threads handling network requests num.network.threads=3 http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 91cd426..5f5d20b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -20,13 +20,15 @@ package kafka.admin import kafka.common._ import kafka.cluster.Broker import kafka.log.LogConfig -import kafka.server.{DynamicConfig, ConfigEntityName, ConfigType} +import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} import kafka.utils._ import kafka.utils.ZkUtils._ import java.util.Random import java.util.Properties + import org.apache.kafka.common.Node -import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException} +import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, LeaderNotAvailableException, ReplicaNotAvailableException, TopicExistsException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.requests.MetadataResponse @@ -622,22 +624,26 @@ object AdminUtils extends Logging with AdminUtilities { } def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata = - fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker]) + fetchTopicMetadataFromZk(topic, zkUtils, mutable.Map.empty[Int, Broker], + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[MetadataResponse.TopicMetadata] = - fetchTopicMetadataFromZk(topics, zkUtils, SecurityProtocol.PLAINTEXT) + fetchTopicMetadataFromZk(topics, zkUtils, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + + def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, securityProtocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] = + fetchTopicMetadataFromZk(topics, zkUtils, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, protocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] = { - val cachedBrokerInfo = new mutable.HashMap[Int, Broker]() - topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo, protocol)) + def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, listenerName: ListenerName): Set[MetadataResponse.TopicMetadata] = { + val cachedBrokerInfo = mutable.Map.empty[Int, Broker] + topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo, listenerName)) } private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, - cachedBrokerInfo: mutable.HashMap[Int, Broker], - protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): MetadataResponse.TopicMetadata = { + cachedBrokerInfo: mutable.Map[Int, Broker], + listenerName: ListenerName): MetadataResponse.TopicMetadata = { if (zkUtils.pathExists(getTopicPath(topic))) { - val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get + val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic))(topic) val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) val partitionMetadata = sortedPartitions.map { partitionMap => val partition = partitionMap._1 @@ -653,15 +659,15 @@ object AdminUtils extends Logging with AdminUtilities { leaderInfo = leader match { case Some(l) => try { - getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(protocol) + getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(listenerName) } catch { case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e) } case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } try { - replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(protocol)) - isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(protocol)) + replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(listenerName)) + isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(listenerName)) } catch { case e: Throwable => throw new ReplicaNotAvailableException(e) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index b53856e..a832c38 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -30,6 +30,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.Node import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.StringDeserializer @@ -367,7 +368,7 @@ object ConsumerGroupCommand extends Logging { private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = { try { zkUtils.getBrokerInfo(brokerId) - .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + .map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerGroupCommand")) .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/api/ApiVersion.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 4052639..4cd10f4 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -59,8 +59,10 @@ object ApiVersion { "0.10.1-IV1" -> KAFKA_0_10_1_IV1, // introduced ListOffsetRequest v1 in KIP-79 "0.10.1-IV2" -> KAFKA_0_10_1_IV2, - "0.10.1" -> KAFKA_0_10_1_IV2 - + "0.10.1" -> KAFKA_0_10_1_IV2, + // introduced UpdateMetadataRequest v3 in KIP-103 + "0.10.2-IV0" -> KAFKA_0_10_2_IV0, + "0.10.2" -> KAFKA_0_10_2_IV0 ) private val versionPattern = "\\.".r @@ -138,3 +140,9 @@ case object KAFKA_0_10_1_IV2 extends ApiVersion { val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1 val id: Int = 8 } + +case object KAFKA_0_10_2_IV0 extends ApiVersion { + val version: String = "0.10.2-IV0" + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1 + val id: Int = 9 +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/client/ClientUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 8893697..dbb8a76 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -22,9 +22,10 @@ import scala.collection._ import kafka.cluster._ import kafka.api._ import kafka.producer._ -import kafka.common.KafkaException +import kafka.common.{BrokerEndPointNotAvailableException, KafkaException} import kafka.utils.{CoreUtils, Logging} import java.util.Properties + import util.Random import kafka.network.BlockingChannel import kafka.utils.ZkUtils @@ -105,34 +106,46 @@ object ClientUtils extends Logging{ } } + /** + * Creates a blocking channel to a random broker + */ + def channelToAnyBroker(zkUtils: ZkUtils, socketTimeoutMs: Int = 3000) : BlockingChannel = { + var channel: BlockingChannel = null + var connected = false + while (!connected) { + val allBrokers = getPlaintextBrokerEndPoints(zkUtils) + Random.shuffle(allBrokers).find { broker => + trace("Connecting to broker %s:%d.".format(broker.host, broker.port)) + try { + channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) + channel.connect() + debug("Created channel to broker %s:%d.".format(channel.host, channel.port)) + true + } catch { + case _: Exception => + if (channel != null) channel.disconnect() + channel = null + info("Error while creating channel to %s:%d.".format(broker.host, broker.port)) + false + } + } + connected = channel != null + } + + channel + } + /** - * Creates a blocking channel to a random broker + * Returns the first end point from each broker with the PLAINTEXT security protocol. */ - def channelToAnyBroker(zkUtils: ZkUtils, socketTimeoutMs: Int = 3000) : BlockingChannel = { - var channel: BlockingChannel = null - var connected = false - while (!connected) { - val allBrokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT) - Random.shuffle(allBrokers).find { broker => - trace("Connecting to broker %s:%d.".format(broker.host, broker.port)) - try { - channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs) - channel.connect() - debug("Created channel to broker %s:%d.".format(channel.host, channel.port)) - true - } catch { - case _: Exception => - if (channel != null) channel.disconnect() - channel = null - info("Error while creating channel to %s:%d.".format(broker.host, broker.port)) - false - } - } - connected = channel != null - } - - channel - } + def getPlaintextBrokerEndPoints(zkUtils: ZkUtils): Seq[BrokerEndPoint] = { + zkUtils.getAllBrokersInCluster().map { broker => + broker.endPoints.collectFirst { + case endPoint if endPoint.securityProtocol == SecurityProtocol.PLAINTEXT => + new BrokerEndPoint(broker.id, endPoint.host, endPoint.port) + }.getOrElse(throw new BrokerEndPointNotAvailableException(s"End point with security protocol PLAINTEXT not found for broker ${broker.id}")) + } + } /** * Creates a blocking channel to the offset manager of the given group http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/cluster/Broker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 7116722..00b4078 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -20,7 +20,9 @@ package kafka.cluster import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException} import kafka.utils.Json import org.apache.kafka.common.Node +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.utils.Time /** * A Kafka broker. @@ -29,6 +31,15 @@ import org.apache.kafka.common.protocol.SecurityProtocol */ object Broker { + private val HostKey = "host" + private val PortKey = "port" + private val VersionKey = "version" + private val EndpointsKey = "endpoints" + private val RackKey = "rack" + private val JmxPortKey = "jmx_port" + private val ListenerSecurityProtocolMapKey = "listener_security_protocol_map" + private val TimestampKey = "timestamp" + /** * Create a broker object from id and JSON string. * @@ -54,7 +65,7 @@ object Broker { * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"] * } * - * Version 3 (current) JSON schema for a broker is: + * Version 3 JSON schema for a broker is: * { * "version":3, * "host":"localhost", @@ -64,6 +75,18 @@ object Broker { * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"], * "rack":"dc1" * } + * + * Version 4 (current) JSON schema for a broker is: + * { + * "version":4, + * "host":"localhost", + * "port":9092 + * "jmx_port":9999, + * "timestamp":"2233345666", + * "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], + * "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"} + * "rack":"dc1" + * } */ def createBroker(id: Int, brokerInfoString: String): Broker = { if (brokerInfoString == null) @@ -72,24 +95,28 @@ object Broker { Json.parseFull(brokerInfoString) match { case Some(m) => val brokerInfo = m.asInstanceOf[Map[String, Any]] - val version = brokerInfo("version").asInstanceOf[Int] + val version = brokerInfo(VersionKey).asInstanceOf[Int] val endpoints = if (version < 1) throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString") else if (version == 1) { - val host = brokerInfo("host").asInstanceOf[String] - val port = brokerInfo("port").asInstanceOf[Int] - Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT)) + val host = brokerInfo(HostKey).asInstanceOf[String] + val port = brokerInfo(PortKey).asInstanceOf[Int] + val securityProtocol = SecurityProtocol.PLAINTEXT + val endPoint = new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) + Seq(endPoint) } else { - val listeners = brokerInfo("endpoints").asInstanceOf[List[String]] - listeners.map { listener => - val ep = EndPoint.createEndPoint(listener) - (ep.protocolType, ep) - }.toMap + val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map( + _.asInstanceOf[Map[String, String]].map { case (listenerName, securityProtocol) => + new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol) + }) + val listeners = brokerInfo(EndpointsKey).asInstanceOf[List[String]] + listeners.map(EndPoint.createEndPoint(_, securityProtocolMap)) } - val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String]) - new Broker(id, endpoints, rack) + val rack = brokerInfo.get(RackKey).filter(_ != null).map(_.asInstanceOf[String]) + + Broker(id, endpoints, rack) case None => throw new BrokerNotAvailableException(s"Broker id $id does not exist") } @@ -98,34 +125,57 @@ object Broker { throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t) } } -} -case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint], rack: Option[String]) { + def toJson(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int, + rack: Option[String]): String = { + val jsonMap = collection.mutable.Map(VersionKey -> version, + HostKey -> host, + PortKey -> port, + EndpointsKey -> advertisedEndpoints.map(_.connectionString).toArray, + JmxPortKey -> jmxPort, + TimestampKey -> Time.SYSTEM.milliseconds().toString + ) + rack.foreach(rack => if (version >= 3) jsonMap += (RackKey -> rack)) - override def toString: String = - s"$id : ${endPoints.values.mkString("(",",",")")} : ${rack.orNull}" + if (version >= 4) { + jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint => + endPoint.listenerName.value -> endPoint.securityProtocol.name + }.toMap) + } - def this(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) = { - this(id, endPoints, None) + Json.encode(jsonMap) } +} + +case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) { + + private val endPointsMap = endPoints.map { endPoint => + endPoint.listenerName -> endPoint + }.toMap + + if (endPointsMap.size != endPoints.size) + throw new IllegalArgumentException(s"There is more than one end point with the same listener name: ${endPoints.mkString(",")}") + + override def toString: String = + s"$id : ${endPointsMap.values.mkString("(",",",")")} : ${rack.orNull}" - def this(id: Int, host: String, port: Int, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { - this(id, Map(protocol -> EndPoint(host, port, protocol)), None) + def this(id: Int, host: String, port: Int, listenerName: ListenerName, protocol: SecurityProtocol) = { + this(id, Seq(EndPoint(host, port, listenerName, protocol)), None) } - def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = { - this(bep.id, bep.host, bep.port, protocol) + def this(bep: BrokerEndPoint, listenerName: ListenerName, protocol: SecurityProtocol) = { + this(bep.id, bep.host, bep.port, listenerName, protocol) } - def getNode(protocolType: SecurityProtocol): Node = { - val endpoint = endPoints.getOrElse(protocolType, - throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id")) + def getNode(listenerName: ListenerName): Node = { + val endpoint = endPointsMap.getOrElse(listenerName, + throw new BrokerEndPointNotAvailableException(s"End point with protocol label $listenerName not found for broker $id")) new Node(id, endpoint.host, endpoint.port, rack.orNull) } - def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = { - val endpoint = endPoints.getOrElse(protocolType, - throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id")) + def getBrokerEndPoint(listenerName: ListenerName): BrokerEndPoint = { + val endpoint = endPointsMap.getOrElse(listenerName, + throw new BrokerEndPointNotAvailableException(s"End point with security protocol $listenerName not found for broker $id")) new BrokerEndPoint(id, endpoint.host, endpoint.port) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/cluster/EndPoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala index 720d819..b3fc748 100644 --- a/core/src/main/scala/kafka/cluster/EndPoint.scala +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -17,37 +17,44 @@ package kafka.cluster -import java.nio.ByteBuffer - -import kafka.api.ApiUtils._ import kafka.common.KafkaException +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Utils +import scala.collection.Map + object EndPoint { private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r - def readFrom(buffer: ByteBuffer): EndPoint = { - val port = buffer.getInt() - val host = readShortString(buffer) - val protocol = buffer.getShort() - EndPoint(host, port, SecurityProtocol.forId(protocol)) - } + private[kafka] val DefaultSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = + SecurityProtocol.values.map(sp => ListenerName.forSecurityProtocol(sp) -> sp).toMap /** - * Create EndPoint object from connectionString - * @param connectionString the format is protocol://host:port or protocol://[ipv6 host]:port - * for example: PLAINTEXT://myhost:9092 or PLAINTEXT://[::1]:9092 + * Create EndPoint object from `connectionString` and optional `securityProtocolMap`. If the latter is not provided, + * we fallback to the default behaviour where listener names are the same as security protocols. + * + * @param connectionString the format is listener_name://host:port or listener_name://[ipv6 host]:port + * for example: PLAINTEXT://myhost:9092, CLIENT://myhost:9092 or REPLICATION://[::1]:9092 * Host can be empty (PLAINTEXT://:9092) in which case we'll bind to default interface * Negative ports are also accepted, since they are used in some unit tests - * @return */ - def createEndPoint(connectionString: String): EndPoint = { + def createEndPoint(connectionString: String, securityProtocolMap: Option[Map[ListenerName, SecurityProtocol]]): EndPoint = { + val protocolMap = securityProtocolMap.getOrElse(DefaultSecurityProtocolMap) + + def securityProtocol(listenerName: ListenerName): SecurityProtocol = + protocolMap.getOrElse(listenerName, + throw new IllegalArgumentException(s"No security protocol defined for listener ${listenerName.value}")) + connectionString match { - case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.forName(protocol)) - case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.forName(protocol)) - case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint") + case uriParseExp(listenerNameString, "", port) => + val listenerName = ListenerName.normalised(listenerNameString) + new EndPoint(null, port.toInt, listenerName, securityProtocol(listenerName)) + case uriParseExp(listenerNameString, host, port) => + val listenerName = ListenerName.normalised(listenerNameString) + new EndPoint(host, port.toInt, listenerName, securityProtocol(listenerName)) + case _ => throw new KafkaException(s"Unable to parse $connectionString to a broker endpoint") } } } @@ -55,25 +62,13 @@ object EndPoint { /** * Part of the broker definition - matching host/port pair to a protocol */ -case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) { - - def connectionString(): String = { +case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) { + def connectionString: String = { val hostport = if (host == null) ":"+port else Utils.formatAddress(host, port) - protocolType + "://" + hostport - } - - def writeTo(buffer: ByteBuffer): Unit = { - buffer.putInt(port) - writeShortString(buffer, host) - buffer.putShort(protocolType.id) + listenerName.value + "://" + hostport } - - def sizeInBytes: Int = - 4 + /* port */ - shortStringLength(host) + - 2 /* protocol id */ } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index dcdeb1e..3b054e4 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -64,7 +64,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } trace("Partitions without leader %s".format(noLeaderPartitionSet)) - val brokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT) + val brokers = ClientUtils.getPlaintextBrokerEndPoints(zkUtils) val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId, http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index e9ccf64..d928034 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -24,10 +24,10 @@ import kafka.cluster.Broker import kafka.common.{KafkaException, TopicAndPartition} import kafka.server.KafkaConfig import kafka.utils._ -import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector} -import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, SecurityProtocol} +import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, LoginType, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests import org.apache.kafka.common.requests.{UpdateMetadataRequest, _} import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint @@ -89,7 +89,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf private def addNewBroker(broker: Broker) { val messageQueue = new LinkedBlockingQueue[QueueItem] debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id)) - val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) + val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName) val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port) val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( @@ -353,7 +353,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { - _.getNode(controller.config.interBrokerSecurityProtocol) + _.getNode(controller.config.interBrokerListenerName) } val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch @@ -379,29 +379,33 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging topicPartition -> partitionState } - val version: Short = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short - else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short - else 0: Short + val version: Short = + if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3 + else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2 + else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 + else 0 val updateMetadataRequest = { val liveBrokers = if (version == 0) { // Version 0 of UpdateMetadataRequest only supports PLAINTEXT. controllerContext.liveOrShuttingDownBrokers.map { broker => - val node = broker.getNode(SecurityProtocol.PLAINTEXT) - val endPoints = Map(SecurityProtocol.PLAINTEXT -> new EndPoint(node.host(), node.port())) + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val node = broker.getNode(listenerName) + val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName)) new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) } } else { controllerContext.liveOrShuttingDownBrokers.map { broker => - val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) => - securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port) + val endPoints = broker.endPoints.map { endPoint => + new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName) } new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) } } new UpdateMetadataRequest.Builder( - controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava). - setVersion(version) + controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava). + setVersion(version) } updateMetadataRequestBrokerSet.foreach { broker => http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 83c6d01..8ffa610 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -210,12 +210,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } ) - def epoch = controllerContext.epoch + def epoch: Int = controllerContext.epoch - def clientId = { - val listeners = config.listeners - val controllerListener = listeners.get(config.interBrokerSecurityProtocol) - "id_%d-host_%s-port_%d".format(config.brokerId, controllerListener.get.host, controllerListener.get.port) + def clientId: String = { + val controllerListener = config.listeners.find(_.listenerName == config.interBrokerListenerName).getOrElse( + throw new IllegalArgumentException(s"No listener with name ${config.interBrokerListenerName} is configured.")) + "id_%d-host_%s-port_%d".format(config.brokerId, controllerListener.host, controllerListener.port) } /** @@ -226,7 +226,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState * @param id Id of the broker to shutdown. * @return The number of partitions that the broker still leads. */ - def shutdownBroker(id: Int) : Set[TopicAndPartition] = { + def shutdownBroker(id: Int): Set[TopicAndPartition] = { if (!isActive) { throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown") http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a511440..c063801 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -29,7 +29,7 @@ import kafka.server.QuotaId import kafka.utils.Logging import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidRequestException -import org.apache.kafka.common.network.Send +import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests._ @@ -38,12 +38,14 @@ import org.apache.kafka.common.utils.Time import org.apache.log4j.Logger object RequestChannel extends Logging { - val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) + val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), + buffer = getShutdownReceive, startTimeMs = 0, listenerName = new ListenerName(""), + securityProtocol = SecurityProtocol.PLAINTEXT) private val requestLogger = Logger.getLogger("kafka.request.logger") - def getShutdownReceive() = { + private def getShutdownReceive = { val emptyProduceRequest = new ProduceRequest.Builder(0, 0, new HashMap[TopicPartition, MemoryRecords]()).build() - val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, emptyProduceRequest.version(), "", 0) + val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, emptyProduceRequest.version, "", 0) AbstractRequestResponse.serialize(emptyRequestHeader, emptyProduceRequest) } @@ -51,7 +53,8 @@ object RequestChannel extends Logging { val sanitizedUser = QuotaId.sanitize(principal.getName) } - case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) { + case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, + startTimeMs: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads @volatile var requestDequeueTimeMs = -1L @@ -149,12 +152,11 @@ object RequestChannel extends Logging { m.totalTimeHist.update(totalTime) } - if (requestLogger.isTraceEnabled) - requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" - .format(requestDesc(true), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal)) - else if (requestLogger.isDebugEnabled) - requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" - .format(requestDesc(false), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal)) + if (requestLogger.isDebugEnabled) { + val detailsEnabled = requestLogger.isTraceEnabled + requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s" + .format(requestDesc(detailsEnabled), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal, listenerName.value)) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index d8d0144..c0353d5 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -34,7 +34,7 @@ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, Selectable, Selector => KSelector} +import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, LoginType, Mode, Selectable, Selector => KSelector} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException @@ -52,7 +52,7 @@ import scala.util.control.{ControlThrowable, NonFatal} */ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { - private val endpoints = config.listeners + private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap private val numProcessorThreads = config.numNetworkThreads private val maxQueuedRequests = config.queuedMaxRequests private val totalProcessorThreads = numProcessorThreads * endpoints.size @@ -87,17 +87,18 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time val brokerId = config.brokerId var processorBeginIndex = 0 - endpoints.values.foreach { endpoint => - val protocol = endpoint.protocolType + config.listeners.foreach { endpoint => + val listenerName = endpoint.listenerName + val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads for (i <- processorBeginIndex until processorEndIndex) - processors(i) = newProcessor(i, connectionQuotas, protocol) + processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol) val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) - Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start() + Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex @@ -130,23 +131,25 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time info("Shutdown completed") } - def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int = { + def boundPort(listenerName: ListenerName): Int = { try { - acceptors(endpoints(protocol)).serverChannel.socket().getLocalPort + acceptors(endpoints(listenerName)).serverChannel.socket.getLocalPort } catch { case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e) } } /* `protected` for test usage */ - protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = { + protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + securityProtocol: SecurityProtocol): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs, - protocol, + listenerName, + securityProtocol, config.values, metrics, credentialProvider @@ -253,7 +256,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint, this.synchronized { processors.foreach { processor => - Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start() + Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", + processor, false).start() } } @@ -373,7 +377,8 @@ private[kafka] class Processor(val id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, - protocol: SecurityProtocol, + listenerName: ListenerName, + securityProtocol: SecurityProtocol, channelConfigs: java.util.Map[String, _], metrics: Metrics, credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { @@ -414,7 +419,7 @@ private[kafka] class Processor(val id: Int, "socket-server", metricTags, false, - ChannelBuilders.serverChannelBuilder(protocol, channelConfigs, credentialProvider.credentialCache)) + ChannelBuilders.serverChannelBuilder(securityProtocol, channelConfigs, credentialProvider.credentialCache)) override def run() { startupComplete() @@ -505,7 +510,9 @@ private[kafka] class Processor(val id: Int, val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) } - val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) + val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, + buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName, + securityProtocol = securityProtocol) requestChannel.sendRequest(req) selector.mute(receive.source) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 794c23d..5c0201b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -37,6 +37,7 @@ import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException} import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} import org.apache.kafka.common.record.{MemoryRecords, Record} import org.apache.kafka.common.requests._ @@ -785,13 +786,13 @@ class KafkaApis(val requestChannel: RequestChannel, offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs) } - private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = { - val topicMetadata = metadataCache.getTopicMetadata(Set(Topic.GroupMetadataTopicName), securityProtocol) + private def getOrCreateGroupMetadataTopic(listenerName: ListenerName): MetadataResponse.TopicMetadata = { + val topicMetadata = metadataCache.getTopicMetadata(Set(Topic.GroupMetadataTopicName), listenerName) topicMetadata.headOption.getOrElse(createGroupMetadataTopic()) } - private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { - val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol, errorUnavailableEndpoints) + private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { + val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses } else { @@ -866,7 +867,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else - getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints) + getTopicMetadata(authorizedTopics, request.listenerName, errorUnavailableEndpoints) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata @@ -876,7 +877,7 @@ class KafkaApis(val requestChannel: RequestChannel, brokers.mkString(","), request.header.correlationId, request.header.clientId)) val responseBody = new MetadataResponse( - brokers.map(_.getNode(request.securityProtocol)).asJava, + brokers.map(_.getNode(request.listenerName)).asJava, clusterId, metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), completeTopicMetadata.asJava, @@ -953,7 +954,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId) // get metadata (and create the topic if necessary) - val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol) + val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName) val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) { new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)