This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9689a31 MINOR: Drop enable.metadata.quorum config (#9934)
9689a31 is described below
commit 9689a313f52a7b45f38faecddf018dc8ab02dc62
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Jan 21 15:16:15 2021 -0800
MINOR: Drop enable.metadata.quorum config (#9934)
The primary purpose of this patch is to remove the internal
`enable.metadata.quorum` configuration. Instead, we rely on `process.roles` to
determine if the self-managed quorum has been enabled. As a part of this, I've
done the following:
1. Replace the notion of "disabled" APIs with "controller-only" APIs. We
previously marked some APIs which were intended only for the KIP-500 as
"disabled" so that they would not be unintentionally exposed. For example, the
Raft quorum APIs were disabled. Marking them as "controller-only" carries the
same effect, but makes the intent that they should be only exposed by the
KIP-500 controller clearer.
2. Make `ForwardingManager` optional in `KafkaServer` and `KafkaApis`.
Previously we used `null` if forwarding was enabled and relied on the metadata
quorum check.
3. Make `zookeeper.connect` an optional configuration if `process.roles` is
defined.
4. Update raft README to remove reference to `zookeeper.conntect`
Reviewers: Colin Patrick McCabe <[email protected]>, Boyang Chen
<[email protected]>
---
.../org/apache/kafka/clients/NodeApiVersions.java | 4 +--
.../org/apache/kafka/common/protocol/ApiKeys.java | 30 +++++++++++-----------
.../org/apache/kafka/common/protocol/Protocol.java | 2 +-
.../kafka/common/requests/ApiVersionsResponse.java | 4 +--
.../apache/kafka/clients/NodeApiVersionsTest.java | 8 +++---
.../common/requests/ApiVersionsResponseTest.java | 6 ++---
core/src/main/scala/kafka/Kafka.scala | 5 ++--
.../main/scala/kafka/network/RequestChannel.scala | 8 +++---
.../main/scala/kafka/network/SocketServer.scala | 14 +++++-----
.../scala/kafka/server/ForwardingManager.scala | 4 +--
core/src/main/scala/kafka/server/KafkaApis.scala | 28 +++++++++-----------
core/src/main/scala/kafka/server/KafkaConfig.scala | 21 ++++++++-------
core/src/main/scala/kafka/server/KafkaServer.scala | 19 +++++++-------
.../main/scala/kafka/tools/TestRaftServer.scala | 7 ++++-
.../kafka/admin/BrokerApiVersionsCommandTest.scala | 2 +-
.../test/scala/unit/kafka/api/ApiVersionTest.scala | 2 +-
.../kafka/integration/KafkaServerTestHarness.scala | 11 ++++++--
.../server/AbstractApiVersionsRequestTest.scala | 2 +-
.../CreateTopicsRequestWithForwardingTest.scala | 6 +----
.../unit/kafka/server/ForwardingManagerTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 13 +++++++---
.../scala/unit/kafka/server/KafkaConfigTest.scala | 17 ++++++++++++
.../scala/unit/kafka/server/RequestQuotaTest.scala | 6 ++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 6 ++++-
.../jmh/metadata/MetadataRequestBenchmark.java | 6 ++---
raft/README.md | 9 +------
26 files changed, 132 insertions(+), 110 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 7588dee..658d481 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -62,7 +62,7 @@ public class NodeApiVersions {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
- for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+ for (ApiKeys apiKey : ApiKeys.brokerApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {
@@ -170,7 +170,7 @@ public class NodeApiVersions {
// Also handle the case where some apiKey types are not specified at
all in the given ApiVersions,
// which may happen when the remote is too old.
- for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+ for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index ca1f4b8..67586d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -90,15 +90,15 @@ public enum ApiKeys {
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS,
false, true),
- VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
- BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true,
RecordBatch.MAGIC_VALUE_V0, false, false),
- END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true,
RecordBatch.MAGIC_VALUE_V0, false, false),
- DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true,
RecordBatch.MAGIC_VALUE_V0, false, false),
+ VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
+ BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true,
RecordBatch.MAGIC_VALUE_V0, false, true),
+ END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true,
RecordBatch.MAGIC_VALUE_V0, false, true),
+ DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true,
RecordBatch.MAGIC_VALUE_V0, false, true),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
- ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false,
false),
- FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false,
RecordBatch.MAGIC_VALUE_V0, false, false),
- DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER, false,
RecordBatch.MAGIC_VALUE_V0, false, true);
+ ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false,
true),
+ FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false,
RecordBatch.MAGIC_VALUE_V0, false, true),
+ DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER);
// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
@@ -116,8 +116,8 @@ public enum ApiKeys {
/** indicates the minimum required inter broker magic required to support
the API */
public final byte minRequiredInterBrokerMagic;
- /** indicates whether the API is enabled and should be exposed in
ApiVersions **/
- public final boolean isEnabled;
+ /** indicates whether this is an API which is only exposed by the KIP-500
controller **/
+ public final boolean isControllerOnlyApi;
/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
@@ -139,7 +139,7 @@ public enum ApiKeys {
}
ApiKeys(ApiMessageType messageType, boolean clusterAction, byte
minRequiredInterBrokerMagic, boolean forwardable) {
- this(messageType, clusterAction, minRequiredInterBrokerMagic,
forwardable, true);
+ this(messageType, clusterAction, minRequiredInterBrokerMagic,
forwardable, false);
}
ApiKeys(
@@ -147,14 +147,14 @@ public enum ApiKeys {
boolean clusterAction,
byte minRequiredInterBrokerMagic,
boolean forwardable,
- boolean isEnabled
+ boolean isControllerOnlyApi
) {
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
- this.isEnabled = isEnabled;
+ this.isControllerOnlyApi = isControllerOnlyApi;
this.requiresDelayedAllocation = forwardable ||
shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
@@ -210,7 +210,7 @@ public enum ApiKeys {
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
- for (ApiKeys key : ApiKeys.enabledApis()) {
+ for (ApiKeys key : ApiKeys.brokerApis()) {
b.append("<tr>\n");
b.append("<td>");
b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name
+ "</a>");
@@ -242,9 +242,9 @@ public enum ApiKeys {
return hasBuffer.get();
}
- public static List<ApiKeys> enabledApis() {
+ public static List<ApiKeys> brokerApis() {
return Arrays.stream(values())
- .filter(api -> api.isEnabled)
+ .filter(api -> !api.isControllerOnlyApi)
.collect(Collectors.toList());
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 0d573db..f31c613 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -133,7 +133,7 @@ public class Protocol {
b.append("</pre>\n");
schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
}
- for (ApiKeys key : ApiKeys.enabledApis()) {
+ for (ApiKeys key : ApiKeys.brokerApis()) {
// Key
b.append("<h5>");
b.append("<a name=\"The_Messages_" + key.name + "\">");
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 3a6ff2f..2bf9360 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -119,7 +119,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static ApiVersionCollection defaultApiKeys(final byte minMagic) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
- for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+ for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey));
}
@@ -137,7 +137,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static ApiVersionCollection intersectControllerApiVersions(final
byte minMagic,
final
Map<ApiKeys, ApiVersion> activeControllerApiVersions) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
- for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+ for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
ApiVersion brokerApiVersion = toApiVersion(apiKey);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 01d8cf6..7c19d9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -38,7 +38,7 @@ public class NodeApiVersionsTest {
NodeApiVersions versions = new NodeApiVersions(new
ApiVersionCollection());
StringBuilder bld = new StringBuilder();
String prefix = "(";
- for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+ for (ApiKeys apiKey : ApiKeys.brokerApis()) {
bld.append(prefix).append(apiKey.name).
append("(").append(apiKey.id).append("): UNSUPPORTED");
prefix = ", ";
@@ -143,10 +143,10 @@ public class NodeApiVersionsTest {
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList);
for (ApiKeys apiKey: ApiKeys.values()) {
- if (apiKey.isEnabled) {
- assertEquals(apiKey.latestVersion(),
versions.latestUsableVersion(apiKey));
- } else {
+ if (apiKey.isControllerOnlyApi) {
assertNull(versions.apiVersion(apiKey));
+ } else {
+ assertEquals(apiKey.latestVersion(),
versions.latestUsableVersion(apiKey));
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index f387a06..38a5869 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -40,7 +40,7 @@ public class ApiVersionsResponseTest {
@Test
public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
-
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE),
new HashSet<>(ApiKeys.enabledApis()));
+
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE),
new HashSet<>(ApiKeys.brokerApis()));
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
@@ -49,9 +49,9 @@ public class ApiVersionsResponseTest {
@Test
public void shouldHaveCorrectDefaultApiVersionsResponse() {
Collection<ApiVersion> apiVersions =
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys();
- assertEquals(apiVersions.size(), ApiKeys.enabledApis().size(), "API
versions for all API keys must be maintained.");
+ assertEquals(apiVersions.size(), ApiKeys.brokerApis().size(), "API
versions for all API keys must be maintained.");
- for (ApiKeys key : ApiKeys.enabledApis()) {
+ for (ApiKeys key : ApiKeys.brokerApis()) {
ApiVersion version =
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id);
assertNotNull(version, "Could not find ApiVersion for API " +
key.name);
assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect
min version for Api " + key.name);
diff --git a/core/src/main/scala/kafka/Kafka.scala
b/core/src/main/scala/kafka/Kafka.scala
index 1e1f345..4e278c9 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -65,11 +65,12 @@ object Kafka extends Logging {
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
- if (config.processRoles.isEmpty) {
+ if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
- threadNamePrefix = None
+ threadNamePrefix = None,
+ enableForwarding = false
)
} else {
new KafkaRaftServer(
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 777c940..7d31125 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -59,11 +59,11 @@ object RequestChannel extends Logging {
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}
- class Metrics(allowDisabledApis: Boolean = false) {
+ class Metrics(allowControllerOnlyApis: Boolean = false) {
private val metricsMap = mutable.Map[String, RequestMetrics]()
- (ApiKeys.values.toSeq.filter(_.isEnabled || allowDisabledApis).map(_.name)
++
+ (ApiKeys.values.toSeq.filter(!_.isControllerOnlyApi ||
allowControllerOnlyApis).map(_.name) ++
Seq(RequestMetrics.consumerFetchMetricName,
RequestMetrics.followFetchMetricName)).foreach { name =>
metricsMap.put(name, new RequestMetrics(name))
}
@@ -337,9 +337,9 @@ object RequestChannel extends Logging {
class RequestChannel(val queueSize: Int,
val metricNamePrefix: String,
time: Time,
- allowDisabledApis: Boolean = false) extends
KafkaMetricsGroup {
+ allowControllerOnlyApis: Boolean = false) extends
KafkaMetricsGroup {
import RequestChannel._
- val metrics = new RequestChannel.Metrics(allowDisabledApis)
+ val metrics = new RequestChannel.Metrics(allowControllerOnlyApis)
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
val requestQueueSizeMetricName =
metricNamePrefix.concat(RequestQueueSizeMetric)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 9345452..da02c5e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -77,7 +77,7 @@ class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider,
- val allowDisabledApis: Boolean = false)
+ val allowControllerOnlyApis: Boolean = false)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
private val maxQueuedRequests = config.queuedMaxRequests
@@ -93,12 +93,12 @@ class SocketServer(val config: KafkaConfig,
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint,
Acceptor]()
- val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests,
DataPlaneMetricPrefix, time, allowDisabledApis)
+ val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests,
DataPlaneMetricPrefix, time, allowControllerOnlyApis)
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] =
config.controlPlaneListenerName.map(_ =>
- new RequestChannel(20, ControlPlaneMetricPrefix, time, allowDisabledApis))
+ new RequestChannel(20, ControlPlaneMetricPrefix, time,
allowControllerOnlyApis))
private var nextProcessorId = 0
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@@ -429,7 +429,7 @@ class SocketServer(val config: KafkaConfig,
memoryPool,
logContext,
isPrivilegedListener = isPrivilegedListener,
- allowDisabledApis = allowDisabledApis
+ allowControllerOnlyApis = allowControllerOnlyApis
)
}
@@ -790,7 +790,7 @@ private[kafka] class Processor(val id: Int,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize,
isPrivilegedListener: Boolean = false,
- allowDisabledApis: Boolean = false) extends
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+ allowControllerOnlyApis: Boolean = false)
extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -981,10 +981,10 @@ private[kafka] class Processor(val id: Int,
protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer)
- if (header.apiKey.isEnabled || allowDisabledApis) {
+ if (!header.apiKey.isControllerOnlyApi || allowControllerOnlyApis) {
header
} else {
- throw new InvalidRequestException("Received request for disabled api key
" + header.apiKey)
+ throw new InvalidRequestException("Received request for KIP-500
controller-only api key " + header.apiKey)
}
}
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index 8d285e8..20fda29 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -36,7 +36,7 @@ trait ForwardingManager {
responseCallback: Option[AbstractResponse] => Unit
): Unit
- def controllerApiVersions(): Option[NodeApiVersions]
+ def controllerApiVersions: Option[NodeApiVersions]
def start(): Unit = {}
@@ -140,7 +140,7 @@ class ForwardingManagerImpl(
channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
}
- override def controllerApiVersions(): Option[NodeApiVersions] =
+ override def controllerApiVersions: Option[NodeApiVersions] =
channelManager.controllerApiVersions()
private def parseResponse(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 09c1ba4..9004cef 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -101,7 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
- val forwardingManager: ForwardingManager,
+ val forwardingManager: Option[ForwardingManager],
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
@@ -131,7 +131,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
- config.metadataQuorumEnabled && request.context.principalSerde.isPresent
+ forwardingManager.isDefined && request.context.principalSerde.isPresent
}
private def maybeForwardToController(
@@ -149,12 +149,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- if (!request.isForwarded && !controller.isActive &&
isForwardingEnabled(request)) {
- forwardingManager.forwardRequest(request, responseCallback)
- } else {
- // When the KIP-500 mode is off or the principal serde is undefined,
forwarding is not supported,
- // therefore requests are handled directly.
- handler(request)
+ forwardingManager match {
+ case Some(mgr) if !request.isForwarded && !controller.isActive =>
+ mgr.forwardRequest(request, responseCallback)
+
+ case _ =>
+ handler(request)
}
}
@@ -1742,11 +1742,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
- val controllerApiVersions = if (isForwardingEnabled(request)) {
- forwardingManager.controllerApiVersions()
- } else {
- None
- }
+ val controllerApiVersions =
forwardingManager.flatMap(_.controllerApiVersions)
val apiVersionsResponse =
finalizedFeaturesOpt match {
@@ -3251,9 +3247,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// If forwarding is not yet enabled or this request has been received on
an invalid endpoint,
// then we treat the request as unparsable and close the connection.
- if (!config.metadataQuorumEnabled) {
+ if (!isForwardingEnabled(request)) {
info(s"Closing connection ${request.context.connectionId} because it
sent an `Envelope` " +
- s"request, which is not accepted without enabling the internal config
${KafkaConfig.EnableMetadataQuorumProp}")
+ "request even though forwarding has not been enabled")
requestHelper.closeConnection(request, Collections.emptyMap())
return
} else if (!request.context.fromPrivilegedListener) {
@@ -3279,7 +3275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val forwardedRequestHeader =
parseForwardedRequestHeader(forwardedRequestBuffer)
val forwardedApi = forwardedRequestHeader.apiKey
- if (!forwardedApi.forwardable || !forwardedApi.isEnabled) {
+ if (!forwardedApi.forwardable) {
throw new InvalidRequestException(s"API $forwardedApi is not enabled or
is not eligible for forwarding")
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fac691e..e933459 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -363,7 +363,6 @@ object KafkaConfig {
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMsProp =
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMaxMsProp =
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
- val EnableMetadataQuorumProp = "enable.metadata.quorum"
val ProcessRolesProp = "process.roles"
/************* Authorizer Configuration ***********/
@@ -648,6 +647,9 @@ object KafkaConfig {
val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
val ConnectionSetupTimeoutMsDoc =
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC
val ConnectionSetupTimeoutMaxMsDoc =
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC
+ val ProcessRolesDoc = "The roles that this process plays: 'broker',
'controller', or 'broker,controller' if it is both. " +
+ "This configuration is only for clusters upgraded for KIP-500, which
replaces the dependence on Zookeeper with " +
+ "a self-managed Raft quorum. Leave this config undefined or empty for
Zookeeper clusters."
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that
implements s${classOf[Authorizer].getName}" +
" interface, which is used by the broker for authorization. This config also
supports authorizers that implement the deprecated" +
@@ -997,9 +999,6 @@ object KafkaConfig {
val PasswordEncoderKeyLengthDoc = "The key length used for encoding
dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding
dynamically configured passwords."
- /** ********* Experimental metadata quorum configuration ***********/
- val ProcessRolesDoc = "This configuration determines what roles this process
should play: broker, controller, or both"
-
private val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
@@ -1009,7 +1008,7 @@ object KafkaConfig {
new ConfigDef()
/** ********* Zookeeper Configuration ***********/
- .define(ZkConnectProp, STRING, HIGH, ZkConnectDoc)
+ .define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc)
.define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH,
ZkSessionTimeoutMsDoc)
.define(ZkConnectionTimeoutMsProp, INT, null, HIGH,
ZkConnectionTimeoutMsDoc)
.define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW,
ZkSyncTimeMsDoc)
@@ -1044,9 +1043,6 @@ object KafkaConfig {
.define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH,
RequestTimeoutMsDoc)
.define(ConnectionSetupTimeoutMsProp, LONG,
Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc)
.define(ConnectionSetupTimeoutMaxMsProp, LONG,
Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc)
-
- // Experimental flag to turn on APIs required for the internal metadata
quorum (KIP-500)
- .defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW)
.defineInternal(ProcessRolesProp, LIST, Collections.emptyList(),
ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
/************* Authorizer Configuration ***********/
@@ -1480,6 +1476,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val processRoles = parseProcessRoles()
+ def requiresZookeeper: Boolean = processRoles.isEmpty
+
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
case "broker" => BrokerRole
@@ -1619,9 +1617,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion >=
KAFKA_2_7_IV0
- /** ********* Experimental metadata quorum configuration ***********/
- def metadataQuorumEnabled = getBoolean(KafkaConfig.EnableMetadataQuorumProp)
-
/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs =
getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
val groupMaxSessionTimeoutMs =
getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
@@ -1910,5 +1905,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs
should always be less than" +
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to
prevent failed" +
s" authentication responses from timing out")
+
+ if (requiresZookeeper && zkConnect == null) {
+ throw new ConfigException(s"Missing required configuration
'${KafkaConfig.ZkConnectProp}' which has no default value.")
+ }
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index b1748ef..fabd019 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -87,6 +87,7 @@ class KafkaServer(
val config: KafkaConfig,
time: Time = Time.SYSTEM,
threadNamePrefix: Option[String] = None,
+ enableForwarding: Boolean = false
) extends Server with Logging with KafkaMetricsGroup {
private val startupComplete = new AtomicBoolean(false)
@@ -129,7 +130,7 @@ class KafkaServer(
var kafkaController: KafkaController = null
- var forwardingManager: ForwardingManager = null
+ var forwardingManager: Option[ForwardingManager] = None
var alterIsrManager: AlterIsrManager = null
@@ -254,10 +255,10 @@ class KafkaServer(
// Delay starting processors until the end of the initialization
sequence to ensure
// that credentials have been loaded before processing authentications.
//
- // Note that we allow the use of disabled APIs when experimental
support for
- // the internal metadata quorum has been enabled
+ // Note that we allow the use of KIP-500 controller APIs when
forwarding is enabled
+ // so that the Envelope request is exposed. This is only used in
testing currently.
socketServer = new SocketServer(config, metrics, time,
credentialProvider,
- allowDisabledApis = config.metadataQuorumEnabled)
+ allowControllerOnlyApis = enableForwarding)
socketServer.startup(startProcessingRequests = false)
/* start replica manager */
@@ -293,15 +294,15 @@ class KafkaServer(
kafkaController = new KafkaController(config, zkClient, time, metrics,
brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache,
threadNamePrefix)
kafkaController.startup()
- if (config.metadataQuorumEnabled) {
- forwardingManager = ForwardingManager(
+ if (enableForwarding) {
+ this.forwardingManager = Some(ForwardingManager(
config,
metadataCache,
time,
metrics,
threadNamePrefix
- )
- forwardingManager.start()
+ ))
+ forwardingManager.foreach(_.start())
}
adminManager = new ZkAdminManager(config, metrics, metadataCache,
zkClient)
@@ -685,7 +686,7 @@ class KafkaServer(
CoreUtils.swallow(alterIsrManager.shutdown(), this)
if (forwardingManager != null)
- CoreUtils.swallow(forwardingManager.shutdown(), this)
+ CoreUtils.swallow(forwardingManager.foreach(_.shutdown()), this)
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 9ac3f0f..68b2d06 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -66,7 +66,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames,
tokenCache)
- socketServer = new SocketServer(config, metrics, time, credentialProvider,
allowDisabledApis = true)
+ socketServer = new SocketServer(config, metrics, time, credentialProvider,
allowControllerOnlyApis = true)
socketServer.startup(startProcessingRequests = false)
raftManager = new KafkaRaftManager[Array[Byte]](
@@ -413,6 +413,11 @@ object TestRaftServer extends Logging {
val configFile = opts.options.valueOf(opts.configOpt)
val serverProps = Utils.loadProps(configFile)
+
+ // KafkaConfig requires either `process.roles` or `zookeeper.connect`.
Neither are
+ // actually used by the test server, so we fill in `process.roles` with
an arbitrary value.
+ serverProps.put(KafkaConfig.ProcessRolesProp, "controller")
+
val config = KafkaConfig.fromProps(serverProps, doLog = false)
val throughput = opts.options.valueOf(opts.throughputOpt)
val recordSize = opts.options.valueOf(opts.recordSizeOpt)
diff --git
a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 5dc6ba4..6224591 100644
---
a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -55,7 +55,7 @@ class BrokerApiVersionsCommandTest extends
KafkaServerTestHarness {
assertTrue(lineIter.hasNext)
assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next())
val nodeApiVersions = NodeApiVersions.create
- val enabledApis = ApiKeys.enabledApis.asScala
+ val enabledApis = ApiKeys.brokerApis.asScala
for (apiKey <- enabledApis) {
val apiVersion = nodeApiVersions.apiVersion(apiKey)
assertNotNull(apiVersion)
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index f3c16fc..1e86876 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -232,7 +232,7 @@ class ApiVersionTest {
Features.emptySupportedFeatures,
None
)
- assertEquals(new util.HashSet[ApiKeys](ApiKeys.enabledApis),
apiKeysInResponse(response))
+ assertEquals(new util.HashSet[ApiKeys](ApiKeys.brokerApis),
apiKeysInResponse(response))
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME,
response.throttleTimeMs)
assertTrue(response.data.supportedFeatures.isEmpty)
assertTrue(response.data.finalizedFeatures.isEmpty)
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 8eae07e..63ba976 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -85,6 +85,7 @@ abstract class KafkaServerTestHarness extends
ZooKeeperTestHarness {
protected def serverSaslProperties: Option[Properties] = None
protected def clientSaslProperties: Option[Properties] = None
protected def brokerTime(brokerId: Int): Time = Time.SYSTEM
+ protected def enableForwarding: Boolean = false
@BeforeEach
override def setUp(): Unit = {
@@ -98,8 +99,14 @@ abstract class KafkaServerTestHarness extends
ZooKeeperTestHarness {
// Add each broker to `servers` buffer as soon as it is created to ensure
that brokers
// are shutdown cleanly in tearDown even if a subsequent broker fails to
start
- for (config <- configs)
- servers += TestUtils.createServer(config, time =
brokerTime(config.brokerId))
+ for (config <- configs) {
+ servers += TestUtils.createServer(
+ config,
+ time = brokerTime(config.brokerId),
+ threadNamePrefix = None,
+ enableForwarding
+ )
+ }
brokerList = TestUtils.bootstrapServers(servers, listenerName)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index fd599c6..d71764e 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -47,7 +47,7 @@ abstract class AbstractApiVersionsRequestTest extends
BaseRequestTest {
}
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse,
listenerName: ListenerName = interBrokerListenerName): Unit = {
- val expectedApis = ApiKeys.enabledApis()
+ val expectedApis = ApiKeys.brokerApis()
if (listenerName == controlPlaneListenerName) {
expectedApis.add(ApiKeys.ENVELOPE)
}
diff --git
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
index b35286d..a538ceb 100644
---
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
+++
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
@@ -17,8 +17,6 @@
package kafka.server
-import java.util.Properties
-
import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -27,9 +25,7 @@ import scala.jdk.CollectionConverters._
class CreateTopicsRequestWithForwardingTest extends
AbstractCreateTopicsRequestTest {
- override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
- }
+ override def enableForwarding: Boolean = true
@Test
def testForwardToController(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index d94d065..200635f 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -191,7 +191,7 @@ class ForwardingManagerTest {
startTimeNanos = time.nanoseconds(),
memoryPool = MemoryPool.NONE,
buffer = requestBuffer,
- metrics = new RequestChannel.Metrics(allowDisabledApis = true),
+ metrics = new RequestChannel.Metrics(allowControllerOnlyApis = true),
envelope = None
)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 22a97ce..909ecc6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -124,14 +124,19 @@ class KafkaApisTest {
val properties = TestUtils.createBrokerConfig(brokerId, "zk")
properties.put(KafkaConfig.InterBrokerProtocolVersionProp,
interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.LogMessageFormatVersionProp,
interBrokerProtocolVersion.toString)
- properties.put(KafkaConfig.EnableMetadataQuorumProp,
enableForwarding.toString)
+
+ val forwardingManagerOpt = if (enableForwarding)
+ Some(this.forwardingManager)
+ else
+ None
+
new KafkaApis(requestChannel,
replicaManager,
adminManager,
groupCoordinator,
txnCoordinator,
controller,
- forwardingManager,
+ forwardingManagerOpt,
zkClient,
brokerId,
new KafkaConfig(properties),
@@ -601,7 +606,7 @@ class KafkaApisTest {
val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS,
ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
val permittedVersion: Short = 0
- EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(
+ EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(
Some(NodeApiVersions.create(ApiKeys.ALTER_CONFIGS.id, permittedVersion,
permittedVersion)))
val capturedResponse = expectNoThrottling()
@@ -637,7 +642,7 @@ class KafkaApisTest {
val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS,
ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
- EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(None)
+ EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(None)
val capturedResponse = expectNoThrottling()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 863dfd8..6cbf04a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -995,4 +995,21 @@ class KafkaConfigTest {
val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
}
+
+ @Test
+ def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def testZookeeperConnectNotRequiredIfNonEmptyProcessRoles(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+ assertTrue(isValidKafkaConfig(props))
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ba074b8..698ecef 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -171,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest {
def testUnauthorizedThrottle(): Unit = {
RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
- for (apiKey <- ApiKeys.enabledApis.asScala) {
+ for (apiKey <- ApiKeys.brokerApis.asScala) {
submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
}
@@ -739,9 +739,9 @@ class RequestQuotaTest extends BaseRequestTest {
}
object RequestQuotaTest {
- val ClusterActions =
ApiKeys.enabledApis.asScala.filter(_.clusterAction).toSet
+ val ClusterActions = ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet
val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
- val ClientActions = ApiKeys.enabledApis.asScala.toSet -- ClusterActions --
SaslActions
+ val ClientActions = ApiKeys.brokerApis.asScala.toSet -- ClusterActions --
SaslActions
val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"Unauthorized")
// Principal used for all client connections. This is modified by tests which
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ee4c9f7..f786240 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -156,7 +156,11 @@ object TestUtils extends Logging {
}
def createServer(config: KafkaConfig, time: Time, threadNamePrefix:
Option[String]): KafkaServer = {
- val server = new KafkaServer(config, time, threadNamePrefix =
threadNamePrefix)
+ createServer(config, time, threadNamePrefix, enableForwarding = false)
+ }
+
+ def createServer(config: KafkaConfig, time: Time, threadNamePrefix:
Option[String], enableForwarding: Boolean): KafkaServer = {
+ val server = new KafkaServer(config, time, threadNamePrefix,
enableForwarding)
server.startup()
server
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 32cb417..2089739 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -22,7 +22,6 @@ import kafka.coordinator.group.GroupCoordinator;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
-import kafka.server.ZkAdminManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientQuotaManager;
@@ -30,7 +29,6 @@ import kafka.server.ClientRequestQuotaManager;
import kafka.server.ControllerMutationQuotaManager;
import kafka.server.FetchManager;
import kafka.server.FinalizedFeatureCache;
-import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
@@ -38,6 +36,7 @@ import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
+import kafka.server.ZkAdminManager;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.memory.MemoryPool;
import
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
@@ -99,7 +98,6 @@ public class MetadataRequestBenchmark {
private ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class);
private TransactionCoordinator transactionCoordinator =
Mockito.mock(TransactionCoordinator.class);
private KafkaController kafkaController =
Mockito.mock(KafkaController.class);
- private ForwardingManager forwardingManager =
Mockito.mock(ForwardingManager.class);
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
@@ -176,7 +174,7 @@ public class MetadataRequestBenchmark {
groupCoordinator,
transactionCoordinator,
kafkaController,
- forwardingManager,
+ Option.empty(),
kafkaZkClient,
brokerId,
new KafkaConfig(kafkaProps),
diff --git a/raft/README.md b/raft/README.md
index 383470b..6b41879 100644
--- a/raft/README.md
+++ b/raft/README.md
@@ -12,8 +12,7 @@ Below we describe the details to set this up.
bin/test-raft-server-start.sh config/raft.properties
### Run Multi Node Quorum ###
-Create 3 separate raft quorum properties as the following
-(note that the `zookeeper.connect` config is required, but unused):
+Create 3 separate raft quorum properties as the following:
`cat << EOF >> config/raft-quorum-1.properties`
@@ -21,8 +20,6 @@ Create 3 separate raft quorum properties as the following
listeners=PLAINTEXT://localhost:9092
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/raft-logs-1
-
- zookeeper.connect=localhost:2181
EOF
`cat << EOF >> config/raft-quorum-2.properties`
@@ -31,8 +28,6 @@ Create 3 separate raft quorum properties as the following
listeners=PLAINTEXT://localhost:9093
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/raft-logs-2
-
- zookeeper.connect=localhost:2181
EOF
`cat << EOF >> config/raft-quorum-3.properties`
@@ -41,8 +36,6 @@ Create 3 separate raft quorum properties as the following
listeners=PLAINTEXT://localhost:9094
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/raft-logs-3
-
- zookeeper.connect=localhost:2181
EOF
Open up 3 separate terminals, and run individual commands: