This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new faaef2c MINOR: Support Raft-based metadata quorums in system tests
(#10093)
faaef2c is described below
commit faaef2c2dff3208fe3ed23f4b075260475543957
Author: Ron Dagostino <[email protected]>
AuthorDate: Thu Feb 11 12:44:17 2021 -0500
MINOR: Support Raft-based metadata quorums in system tests (#10093)
We need to be able to run system tests with Raft-based metadata quorums --
both
co-located brokers and controllers as well as remote controllers -- in
addition to the
ZooKepeer-based mode we run today. This PR adds this capability to
KafkaService in a
backwards-compatible manner as follows.
If no changes are made to existing system tests then they function as they
always do --
they instantiate ZooKeeper, and Kafka will use ZooKeeper. On the other
hand, if we want
to use a Raft-based metadata quorum we can do so by introducing a
metadata_quorum
argument to the test method and using @matrix to set it to the quorums we
want to use for
the various runs of the test. We then also have to skip creating a
ZooKeeperService when
the quorum is Raft-based.
This PR does not update any tests -- those will come later after all the
KIP-500 code is
merged.
Reviewers: Colin P. McCabe <[email protected]>
---
config/raft-broker.properties | 125 ++++++
config/raft-combined.properties | 125 ++++++
config/raft-controller.properties | 124 ++++++
tests/kafkatest/services/kafka/config.py | 3 -
tests/kafkatest/services/kafka/config_property.py | 5 +
tests/kafkatest/services/kafka/kafka.py | 436 ++++++++++++++++++---
tests/kafkatest/services/kafka/quorum.py | 144 +++++++
.../services/kafka/templates/kafka.properties | 32 +-
8 files changed, 930 insertions(+), 64 deletions(-)
diff --git a/config/raft-broker.properties b/config/raft-broker.properties
new file mode 100644
index 0000000..b93ad1c
--- /dev/null
+++ b/config/raft-broker.properties
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in kip-500 mode
+process.roles=broker
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings
#############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://localhost:9092
+inter.broker.listener.name=PLAINTEXT
+
+# Hostname and port the broker will advertise to producers and consumers. If
not set,
+# it uses the value for "listeners" if configured. Otherwise, it will use the
value
+# returned from java.net.InetAddress.getCanonicalHostName().
+advertised.listeners=PLAINTEXT://localhost:9092
+
+# Listener, host name, and port for the controller to advertise to the
brokers. If
+# this server is a controller, this listener must be configured.
+controller.listener.names=CONTROLLER
+
+# Maps listener names to security protocols, the default is for them to be the
same. See the config documentation for more details
+listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the
network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which
may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection
against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/raft-broker-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs
located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings
#############################
+# The replication factor for the group metadata internal topics
"__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is
recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only
fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data
to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using
replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a
small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a
per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy
#############################
+
+# The following configurations control the disposal of log segments. The
policy can
+# be set to delete segments after a period of time, or after a given size has
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log
unless the remaining
+# segments drop below log.retention.bytes. Functions independently of
log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log
segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted
according
+# to the retention policies
+log.retention.check.interval.ms=300000
diff --git a/config/raft-combined.properties b/config/raft-combined.properties
new file mode 100644
index 0000000..1d71b2f
--- /dev/null
+++ b/config/raft-combined.properties
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in kip-500 mode
+process.roles=broker,controller
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings
#############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://:9092,CONTROLLER://:9093
+inter.broker.listener.name=PLAINTEXT
+
+# Hostname and port the broker will advertise to producers and consumers. If
not set,
+# it uses the value for "listeners" if configured. Otherwise, it will use the
value
+# returned from java.net.InetAddress.getCanonicalHostName().
+advertised.listeners=PLAINTEXT://localhost:9092
+
+# Listener, host name, and port for the controller to advertise to the
brokers. If
+# this server is a controller, this listener must be configured.
+controller.listener.names=CONTROLLER
+
+# Maps listener names to security protocols, the default is for them to be the
same. See the config documentation for more details
+listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the
network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which
may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection
against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/raft-combined-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs
located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings
#############################
+# The replication factor for the group metadata internal topics
"__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is
recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only
fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data
to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using
replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a
small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a
per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy
#############################
+
+# The following configurations control the disposal of log segments. The
policy can
+# be set to delete segments after a period of time, or after a given size has
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log
unless the remaining
+# segments drop below log.retention.bytes. Functions independently of
log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log
segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted
according
+# to the retention policies
+log.retention.check.interval.ms=300000
diff --git a/config/raft-controller.properties
b/config/raft-controller.properties
new file mode 100644
index 0000000..a8fbf92
--- /dev/null
+++ b/config/raft-controller.properties
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in kip-500 mode
+process.roles=controller
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings
#############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://:9093
+
+# Hostname and port the broker will advertise to producers and consumers. If
not set,
+# it uses the value for "listeners" if configured. Otherwise, it will use the
value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+
+# Listener, host name, and port for the controller to advertise to the
brokers. If
+# this server is a controller, this listener must be configured.
+controller.listener.names=PLAINTEXT
+
+# Maps listener names to security protocols, the default is for them to be the
same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the
network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which
may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection
against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/raft-controller-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs
located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings
#############################
+# The replication factor for the group metadata internal topics
"__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is
recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only
fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data
to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using
replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a
small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a
per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy
#############################
+
+# The following configurations control the disposal of log segments. The
policy can
+# be set to delete segments after a period of time, or after a given size has
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log
unless the remaining
+# segments drop below log.retention.bytes. Functions independently of
log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log
segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted
according
+# to the retention policies
+log.retention.check.interval.ms=300000
diff --git a/tests/kafkatest/services/kafka/config.py
b/tests/kafkatest/services/kafka/config.py
index 64e96e3..d440fcf 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -22,11 +22,8 @@ class KafkaConfig(dict):
"""
DEFAULTS = {
- config_property.PORT: 9092,
config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
config_property.LOG_DIRS:
"/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
- config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 18000,
- config_property.ZOOKEEPER_SESSION_TIMEOUT_MS: 18000
}
def __init__(self, **kwargs):
diff --git a/tests/kafkatest/services/kafka/config_property.py
b/tests/kafkatest/services/kafka/config_property.py
index 6749c3b..2222c16 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -18,6 +18,11 @@ Define Kafka configuration property names here.
"""
BROKER_ID = "broker.id"
+NODE_ID = "node.id"
+FIRST_BROKER_PORT = 9092
+FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
+FIRST_CONTROLLER_ID = 3001
+CLUSTER_ID = "6bd37820-6745-4790-ae98-620300e1f61b"
PORT = "port"
ADVERTISED_HOSTNAME = "advertised.host.name"
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 4da2ab2..2073252 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -24,9 +24,8 @@ from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from .config import KafkaConfig
-from kafkatest.version import KafkaVersion
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.kafka import config_property
+from kafkatest.services.kafka import config_property, quorum
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.services.security.listener_security_config import
ListenerSecurityConfig
@@ -53,6 +52,94 @@ class KafkaListener:
return "%s:%s" % (self.name, self.security_protocol)
class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
+ """
+ Ducktape system test service for Brokers and Raft-based Controllers
+
+ Metadata Quorums
+ ----------------
+ Kafka can use either ZooKeeper or a Raft Controller quorum for its
+ metadata. See the kafkatest.services.kafka.quorum.ServiceQuorumInfo
+ class for details.
+
+ Attributes
+ ----------
+
+ quorum_info : kafkatest.services.kafka.quorum.ServiceQuorumInfo
+ Information about the service and it's metadata quorum
+ num_nodes_broker_role : int
+ The number of nodes in the service that include 'broker'
+ in process.roles (0 when using Zookeeper)
+ num_nodes_controller_role : int
+ The number of nodes in the service that include 'controller'
+ in process.roles (0 when using Zookeeper)
+ controller_quorum : KafkaService
+ None when using ZooKeeper, otherwise the Kafka service for the
+ co-located case or the remote controller quorum service
+ instance for the remote case
+ remote_controller_quorum : KafkaService
+ None for the co-located case or when using ZooKeeper, otherwise
+ the remote controller quorum service instance
+
+ Kafka Security Protocols
+ ------------------------
+ The security protocol advertised to clients and the inter-broker
+ security protocol can be set in the constructor and can be changed
+ afterwards as well. Set these attributes to make changes; they
+ take effect when starting each node:
+
+ security_protocol : str
+ default PLAINTEXT
+ client_sasl_mechanism : str
+ default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+ interbroker_security_protocol : str
+ default PLAINTEXT
+ interbroker_sasl_mechanism : str
+ default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+
+ ZooKeeper
+ ---------
+ Create an instance of ZookeeperService when metadata_quorum is ZK
+ (ZK is the default if metadata_quorum is not a test parameter).
+
+ Raft Quorums
+ ------------
+ Set metadata_quorum accordingly (to COLOCATED_RAFT or REMOTE_RAFT).
+ Do not instantiate a ZookeeperService instance.
+
+ Starting Kafka will cause any remote controller quorum to
+ automatically start first. Explicitly stopping Kafka does not stop
+ any remote controller quorum, but Ducktape will stop both when
+ tearing down the test (it will stop Kafka first).
+
+ Raft Security Protocols
+ --------------------------------
+ The broker-to-controller and inter-controller security protocols
+ will both initially be set to the inter-broker security protocol.
+ The broker-to-controller and inter-controller security protocols
+ must be identical for the co-located case (an exception will be
+ thrown when trying to start the service if they are not identical).
+ The broker-to-controller and inter-controller security protocols
+ can differ in the remote case.
+
+ Set these attributes for the co-located case. Changes take effect
+ when starting each node:
+
+ controller_security_protocol : str
+ default PLAINTEXT
+ controller_sasl_mechanism : str
+ default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+ intercontroller_security_protocol : str
+ default PLAINTEXT
+ intercontroller_sasl_mechanism : str
+ default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
+
+ Set the same attributes for the remote case (changes take effect
+ when starting each quorum node), but you must first obtain the
+ service instance for the remote quorum via one of the
+ 'controller_quorum' or 'remote_controller_quorum' attributes as
+ defined above.
+
+ """
PERSISTENT_ROOT = "/mnt/kafka"
STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT,
"server-start-stdout-stderr.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties")
@@ -74,6 +161,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
JAAS_CONF_PROPERTY =
"java.security.auth.login.config=/mnt/security/jaas.conf"
ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY =
"java.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf"
KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
+ SECURITY_PROTOCOLS = [SecurityConfig.PLAINTEXT, SecurityConfig.SSL,
SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]
logs = {
"kafka_server_start_stdout_stderr": {
@@ -103,14 +191,45 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
jmx_attributes=None, zk_connect_timeout=18000,
zk_session_timeout=18000, server_prop_overides=None, zk_chroot=None,
zk_client_secure=False,
listener_security_config=ListenerSecurityConfig(),
per_node_server_prop_overrides=None,
- extra_kafka_opts="", tls_version=None):
+ extra_kafka_opts="", tls_version=None,
+ remote_kafka=None,
+ controller_num_nodes_override=0,
+ ):
"""
:param context: test context
+ :param int num_nodes: the number of nodes in the service. There are 4
possibilities:
+ 1) Zookeeper quorum:
+ The number of brokers is defined by this parameter.
+ 2) Co-located Raft quorum:
+ The number of nodes having a broker role is defined by this
parameter.
+ The number of nodes having a controller role will by default
be 1, 3, or 5 depending on num_nodes
+ (1 if num_nodes < 3, otherwise 3 if num_nodes < 5, otherwise
5). This calculation
+ can be overridden via controller_num_nodes_override, which
must be between 1 and num_nodes,
+ inclusive, when non-zero. Here are some possibilities:
+ num_nodes = 1:
+ node 0: broker.roles=broker+controller
+ num_nodes = 2:
+ node 0: broker.roles=broker+controller
+ node 1: broker.roles=broker
+ num_nodes = 3:
+ node 0: broker.roles=broker+controller
+ node 1: broker.roles=broker+controller
+ node 2: broker.roles=broker+controller
+ num_nodes = 3, controller_num_nodes_override = 1
+ node 0: broker.roles=broker+controller
+ node 1: broker.roles=broker
+ node 2: broker.roles=broker
+ 3) Remote Raft quorum when instantiating the broker service:
+ The number of nodes, all of which will have
broker.roles=broker, is defined by this parameter.
+ 4) Remote Raft quorum when instantiating the controller service:
+ The number of nodes, all of which will have
broker.roles=controller, is defined by this parameter.
+ The value passed in is determined by the broker service when
that is instantiated, and it uses the
+ same algorithm as described above: 1, 3, or 5 unless
controller_num_nodes_override is provided.
:param ZookeeperService zk:
:param dict topics: which topics to create automatically
:param str security_protocol: security protocol for clients to use
:param str tls_version: version of the TLS protocol.
- :param str interbroker_security_protocol: security protocol to use for
broker-to-broker communication
+ :param str interbroker_security_protocol: security protocol to use for
broker-to-broker (and Raft controller-to-controller) communication
:param str client_sasl_mechanism: sasl mechanism for clients to use
:param str interbroker_sasl_mechanism: sasl mechanism to use for
broker-to-broker communication
:param str authorizer_class_name: which authorizer class to use
@@ -120,18 +239,75 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
:param int zk_connect_timeout:
:param int zk_session_timeout:
:param dict server_prop_overides: overrides for kafka.properties file
- :param zk_chroot:
+ :param str zk_chroot:
:param bool zk_client_secure: connect to Zookeeper over secure client
port (TLS) when True
:param ListenerSecurityConfig listener_security_config: listener
config to use
- :param dict per_node_server_prop_overrides:
+ :param dict per_node_server_prop_overrides: overrides for
kafka.properties file keyed by 0-based node number
:param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
+ :param str tls_version: TLS version to use
+ :param KafkaService remote_kafka: process.roles=controller for this
cluster when not None; ignored when using ZooKeeper
+ :param int controller_num_nodes_override: the number of nodes to use
in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not
using ZooKeeper, and remote_kafka is not None; ignored otherwise
+
"""
+
+ self.zk = zk
+ self.remote_kafka = remote_kafka
+ self.quorum_info = quorum.ServiceQuorumInfo(self, context)
+ self.controller_quorum = None # will define below if necessary
+ self.remote_controller_quorum = None # will define below if necessary
+
+ if num_nodes < 1:
+ raise Exception("Must set a positive number of nodes: %i" %
num_nodes)
+ self.num_nodes_broker_role = 0
+ self.num_nodes_controller_role = 0
+
+ if self.quorum_info.using_raft:
+ if self.quorum_info.has_brokers:
+ num_nodes_broker_role = num_nodes
+ if self.quorum_info.has_controllers:
+ self.num_nodes_controller_role =
self.num_raft_controllers(num_nodes_broker_role, controller_num_nodes_override)
+ if self.remote_kafka:
+ raise Exception("Must not specify remote Kafka service
with co-located Controller quorum")
+ else:
+ self.num_nodes_controller_role = num_nodes
+ if not self.remote_kafka:
+ raise Exception("Must specify remote Kafka service when
instantiating remote Controller service (should not happen)")
+
+ # Initially use the inter-broker security protocol for both
+ # broker-to-controller and inter-controller communication. Both
can be explicitly changed later if desired.
+ # Note, however, that the two must the same if the controller
quorum is co-located with the
+ # brokers. Different security protocols for the two are only
supported with a remote controller quorum.
+ self.controller_security_protocol = interbroker_security_protocol
+ self.controller_sasl_mechanism = interbroker_sasl_mechanism
+ self.intercontroller_security_protocol =
interbroker_security_protocol
+ self.intercontroller_sasl_mechanism = interbroker_sasl_mechanism
+
+ # Ducktape tears down services in the reverse order in which they
are created,
+ # so create a service for the remote controller quorum (if we need
one) first, before
+ # invoking Service.__init__(), so that Ducktape will tear down the
quorum last; otherwise
+ # Ducktape will tear down the controller quorum first, which could
lead to problems in
+ # Kafka and delays in tearing it down (and who knows what else --
it's simply better
+ # to correctly tear down Kafka first, before tearing down the
remote controller).
+ if self.quorum_info.has_controllers:
+ self.controller_quorum = self
+ else:
+ num_remote_controller_nodes =
self.num_raft_controllers(num_nodes, controller_num_nodes_override)
+ self.remote_controller_quorum = KafkaService(
+ context, num_remote_controller_nodes, None,
security_protocol=self.controller_security_protocol,
+
interbroker_security_protocol=self.intercontroller_security_protocol,
+ client_sasl_mechanism=self.controller_sasl_mechanism,
interbroker_sasl_mechanism=self.intercontroller_sasl_mechanism,
+ authorizer_class_name=authorizer_class_name,
version=version, jmx_object_names=jmx_object_names,
+ jmx_attributes=jmx_attributes,
+ listener_security_config=listener_security_config,
+ extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
+ remote_kafka=self,
+ )
+ self.controller_quorum = self.remote_controller_quorum
+
Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes,
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=KafkaService.PERSISTENT_ROOT)
- self.zk = zk
-
self.security_protocol = security_protocol
self.tls_version = tls_version
self.client_sasl_mechanism = client_sasl_mechanism
@@ -171,35 +347,79 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
# e.g. brokers to deregister after a hard kill.
self.zk_session_timeout = zk_session_timeout
- self.port_mappings = {
- 'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False),
- 'SSL': KafkaListener('SSL', 9093, 'SSL', False),
- 'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094,
'SASL_PLAINTEXT', False),
- 'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False),
+ broker_only_port_mappings = {
KafkaService.INTERBROKER_LISTENER_NAME:
- KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099,
None, False)
+ KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME,
config_property.FIRST_BROKER_PORT + 7, None, False)
}
+ controller_only_port_mappings = {}
+ for idx, sec_protocol in enumerate(KafkaService.SECURITY_PROTOCOLS):
+ name_for_controller = self.controller_listener_name(sec_protocol)
+ broker_only_port_mappings[sec_protocol] =
KafkaListener(sec_protocol, config_property.FIRST_BROKER_PORT + idx,
sec_protocol, False)
+ controller_only_port_mappings[name_for_controller] =
KafkaListener(name_for_controller, config_property.FIRST_CONTROLLER_PORT + idx,
sec_protocol, False)
+
+ if self.quorum_info.using_zk or self.quorum_info.has_brokers and not
self.quorum_info.has_controllers: # ZK or Raft broker-only
+ self.port_mappings = broker_only_port_mappings
+ elif self.quorum_info.has_brokers_and_controllers: # Raft
broker+controller
+ self.port_mappings = broker_only_port_mappings.copy()
+ self.port_mappings.update(controller_only_port_mappings)
+ else: # Raft controller-only
+ self.port_mappings = controller_only_port_mappings
self.interbroker_listener = None
- self.setup_interbroker_listener(interbroker_security_protocol,
self.listener_security_config.use_separate_interbroker_listener)
+ if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+ self.setup_interbroker_listener(interbroker_security_protocol,
self.listener_security_config.use_separate_interbroker_listener)
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self._security_config = None
for node in self.nodes:
+ node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
+
node.version = version
- node.config = KafkaConfig(**{
+ raft_broker_configs = {
+ config_property.PORT: config_property.FIRST_BROKER_PORT,
+ config_property.NODE_ID: self.idx(node),
+ }
+ zk_broker_configs = {
+ config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.BROKER_ID: self.idx(node),
config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS:
zk_connect_timeout,
config_property.ZOOKEEPER_SESSION_TIMEOUT_MS:
zk_session_timeout
- })
+ }
+ controller_only_configs = {
+ config_property.NODE_ID: self.idx(node) +
config_property.FIRST_CONTROLLER_ID - 1,
+ }
+ if node_quorum_info.service_quorum_info.using_zk:
+ node.config = KafkaConfig(**zk_broker_configs)
+ elif not node_quorum_info.has_broker_role: # Raft controller-only
role
+ node.config = KafkaConfig(**controller_only_configs)
+ else: # Raft broker-only role or combined broker+controller roles
+ node.config = KafkaConfig(**raft_broker_configs)
+
+ def num_raft_controllers(self, num_nodes_broker_role,
controller_num_nodes_override):
+ if controller_num_nodes_override < 0:
+ raise Exception("controller_num_nodes_override must not be
negative: %i" % controller_num_nodes_override)
+ if controller_num_nodes_override > num_nodes_broker_role and
self.quorum_info.quorum_type == quorum.colocated_raft:
+ raise Exception("controller_num_nodes_override must not exceed the
service's node count in the co-located case: %i > %i" %
+ (controller_num_nodes_override,
num_nodes_broker_role))
+ if controller_num_nodes_override:
+ return controller_num_nodes_override
+ if num_nodes_broker_role < 3:
+ return 1
+ if num_nodes_broker_role < 5:
+ return 3
+ return 5
def set_version(self, version):
for node in self.nodes:
node.version = version
+ def controller_listener_name(self, security_protocol_name):
+ return "CONTROLLER_%s" % security_protocol_name
+
@property
def interbroker_security_protocol(self):
- return self.interbroker_listener.security_protocol
+ # TODO: disentangle interbroker and intercontroller protocol
information
+ return self.interbroker_listener.security_protocol if
self.quorum_info.using_zk or self.quorum_info.has_brokers else
self.intercontroller_security_protocol
# this is required for backwards compatibility - there are a lot of tests
that set this property explicitly
# meaning 'use one of the existing listeners that match given security
protocol, do not use custom listener'
@@ -222,21 +442,24 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
@property
def security_config(self):
if not self._security_config:
- self._security_config = SecurityConfig(self.context,
self.security_protocol, self.interbroker_listener.security_protocol,
- zk_sasl=self.zk.zk_sasl,
zk_tls=self.zk_client_secure,
-
client_sasl_mechanism=self.client_sasl_mechanism,
-
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
-
listener_security_config=self.listener_security_config,
- tls_version=self.tls_version)
+ client_sasl_mechanism_to_use = self.client_sasl_mechanism if
self.quorum_info.using_zk or self.quorum_info.has_brokers else
self.controller_sasl_mechanism
+ interbroker_sasl_mechanism_to_use =
self.interbroker_sasl_mechanism if self.quorum_info.using_zk or
self.quorum_info.has_brokers else self.intercontroller_sasl_mechanism
+ self._security_config = SecurityConfig(self.context,
self.security_protocol, self.interbroker_security_protocol,
+ zk_sasl=self.zk.zk_sasl if
self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
+
client_sasl_mechanism=client_sasl_mechanism_to_use,
+
interbroker_sasl_mechanism=interbroker_sasl_mechanism_to_use,
+
listener_security_config=self.listener_security_config,
+
tls_version=self.tls_version)
for port in self.port_mappings.values():
if port.open:
self._security_config.enable_security_protocol(port.security_protocol)
- if self.zk.zk_sasl:
- self._security_config.enable_sasl()
- self._security_config.zk_sasl = self.zk.zk_sasl
- if self.zk_client_secure:
- self._security_config.enable_ssl()
- self._security_config.zk_tls = self.zk_client_secure
+ if self.quorum_info.using_zk:
+ if self.zk.zk_sasl:
+ self._security_config.enable_sasl()
+ self._security_config.zk_sasl = self.zk.zk_sasl
+ if self.zk_client_secure:
+ self._security_config.enable_ssl()
+ self._security_config.zk_tls = self.zk_client_secure
return self._security_config
def open_port(self, listener_name):
@@ -246,35 +469,62 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.port_mappings[listener_name].open = False
def start_minikdc_if_necessary(self, add_principals=""):
- if self.security_config.has_sasl:
+ has_sasl = self.security_config.has_sasl if self.quorum_info.using_zk
else \
+ self.security_config.has_sasl or
self.controller_quorum.security_config.has_sasl if self.quorum_info.has_brokers
else \
+ self.security_config.has_sasl or
self.remote_kafka.security_config.has_sasl
+ if has_sasl:
if self.minikdc is None:
- self.minikdc = MiniKdc(self.context, self.nodes,
extra_principals = add_principals)
- self.minikdc.start()
+ other_service = self.remote_kafka if self.remote_kafka else
self.controller_quorum if self.quorum_info.using_raft else None
+ if not other_service or not other_service.minikdc:
+ nodes_for_kdc = self.nodes.copy()
+ if other_service and other_service != self:
+ nodes_for_kdc += other_service.nodes
+ self.minikdc = MiniKdc(self.context, nodes_for_kdc,
extra_principals = add_principals)
+ self.minikdc.start()
else:
self.minikdc = None
+ if self.quorum_info.using_raft:
+ self.controller_quorum.minikdc = None
+ if self.remote_kafka:
+ self.remote_kafka.minikdc = None
def alive(self, node):
return len(self.pids(node)) > 0
def start(self, add_principals=""):
- if self.zk_client_secure and not self.zk.zk_client_secure_port:
+ if self.quorum_info.using_zk and self.zk_client_secure and not
self.zk.zk_client_secure_port:
raise Exception("Unable to start Kafka: TLS to Zookeeper requested
but Zookeeper secure port not enabled")
- self.open_port(self.security_protocol)
- self.interbroker_listener.open = True
+ if self.quorum_info.has_brokers_and_controllers and (
+ self.controller_security_protocol !=
self.intercontroller_security_protocol or
+ self.controller_sasl_mechanism !=
self.intercontroller_sasl_mechanism):
+ raise Exception("Co-located Raft-based Brokers (%s/%s) and
Controllers (%s/%s) cannot talk to Controllers via different security
protocols" %
+ (self.controller_security_protocol,
self.controller_sasl_mechanism,
+ self.intercontroller_security_protocol,
self.intercontroller_sasl_mechanism))
+ if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+ self.open_port(self.security_protocol)
+ self.interbroker_listener.open = True
+ # we have to wait to decide whether to open the controller port(s)
+ # because it could be dependent on the particular node in the
+ # co-located case where the number of controllers could be less
+ # than the number of nodes in the service
self.start_minikdc_if_necessary(add_principals)
- self._ensure_zk_chroot()
+ if self.quorum_info.using_zk:
+ self._ensure_zk_chroot()
+ if self.remote_controller_quorum:
+ self.remote_controller_quorum.start()
Service.start(self)
- self.logger.info("Waiting for brokers to register at ZK")
+ if self.quorum_info.using_zk:
+ self.logger.info("Waiting for brokers to register at ZK")
- retries = 30
- expected_broker_ids = set(self.nodes)
- wait_until(lambda: {node for node in self.nodes if
self.is_registered(node)} == expected_broker_ids, 30, 1)
+ retries = 30
+ expected_broker_ids = set(self.nodes)
+ wait_until(lambda: {node for node in self.nodes if
self.is_registered(node)} == expected_broker_ids, 30, 1)
- if retries == 0:
- raise RuntimeError("Kafka servers didn't register at ZK within 30
seconds")
+ if retries == 0:
+ raise RuntimeError("Kafka servers didn't register at ZK within
30 seconds")
# Create topics if necessary
if self.topics is not None:
@@ -300,16 +550,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
advertised_listeners = []
protocol_map = []
+ controller_listener_names = self.controller_listener_name_list()
+
for port in self.port_mappings.values():
if port.open:
listeners.append(port.listener())
- advertised_listeners.append(port.advertised_listener(node))
+ if not port.name in controller_listener_names:
+ advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol())
+ controller_sec_protocol =
self.remote_controller_quorum.controller_security_protocol if
self.remote_controller_quorum \
+ else self.controller_security_protocol if
self.quorum_info.has_brokers_and_controllers and not
quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
+ else None
+ if controller_sec_protocol:
+ protocol_map.append("%s:%s" %
(self.controller_listener_name(controller_sec_protocol),
controller_sec_protocol))
self.listeners = ','.join(listeners)
self.advertised_listeners = ','.join(advertised_listeners)
self.listener_security_protocol_map = ','.join(protocol_map)
- self.interbroker_bootstrap_servers =
self.__bootstrap_servers(self.interbroker_listener, True)
+ if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+ self.interbroker_bootstrap_servers =
self.__bootstrap_servers(self.interbroker_listener, True)
def prop_file(self, node):
self.set_protocol_and_port(node)
@@ -324,13 +583,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
#load specific test override configs
override_configs = KafkaConfig(**node.config)
- override_configs[config_property.ADVERTISED_HOSTNAME] =
node.account.hostname
- override_configs[config_property.ZOOKEEPER_CONNECT] =
self.zk_connect_setting()
- if self.zk_client_secure:
- override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] =
'true'
- override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET] =
'org.apache.zookeeper.ClientCnxnSocketNetty'
- else:
- override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] =
'false'
+ if self.quorum_info.using_zk or self.quorum_info.has_brokers:
+ override_configs[config_property.ADVERTISED_HOSTNAME] =
node.account.hostname
+ if self.quorum_info.using_zk:
+ override_configs[config_property.ZOOKEEPER_CONNECT] =
self.zk_connect_setting()
+ if self.zk_client_secure:
+ override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE]
= 'true'
+ override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET]
= 'org.apache.zookeeper.ClientCnxnSocketNetty'
+ else:
+ override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE]
= 'false'
for prop in self.server_prop_overides:
override_configs[prop[0]] = prop[1]
@@ -370,11 +631,40 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd
+ def controller_listener_name_list(self):
+ if self.quorum_info.using_zk:
+ return []
+ broker_to_controller_listener_name =
self.controller_listener_name(self.controller_quorum.controller_security_protocol)
+ return [broker_to_controller_listener_name] if
(self.controller_quorum.intercontroller_security_protocol ==
self.controller_quorum.controller_security_protocol) \
+ else [broker_to_controller_listener_name,
self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
+
def start_node(self, node, timeout_sec=60):
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
+ self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
+ if self.quorum_info.has_controllers:
+ for controller_listener in self.controller_listener_name_list():
+ if self.node_quorum_info.has_controller_role:
+ self.open_port(controller_listener)
+ else: # co-located case where node doesn't have a controller
+ self.close_port(controller_listener)
+
self.security_config.setup_node(node)
- self.maybe_setup_broker_scram_credentials(node)
+ if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO:
SCRAM currently unsupported for controller quorum
+ self.maybe_setup_broker_scram_credentials(node)
+
+ if self.quorum_info.using_raft:
+ # define controller.quorum.voters text
+ security_protocol_to_use =
self.controller_quorum.controller_security_protocol
+ first_node_id = 1 if self.quorum_info.has_brokers_and_controllers
else config_property.FIRST_CONTROLLER_ID
+ self.controller_quorum_voters = ','.join(["%s@%s:%s" %
+
(self.controller_quorum.idx(node) + first_node_id - 1,
+ node.account.hostname,
+
config_property.FIRST_CONTROLLER_PORT +
+
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+ for node in
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
+ # define controller.listener.names
+ self.controller_listener_names =
','.join(self.controller_listener_name_list())
prop_file = self.prop_file(node)
self.logger.info("kafka.properties:")
@@ -382,6 +672,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
node.account.create_file(self.LOG4J_CONFIG,
self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
+ if self.quorum_info.using_raft:
+ # format log directories if necessary
+ kafka_storage_script = self.path.script("kafka-storage.sh", node)
+ cmd = "%s format --ignore-formatted --config %s --cluster-id %s" %
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
+ self.logger.info("Running log directory format command...\n%s" %
cmd)
+ node.account.ssh(cmd)
+
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService on %s with
command: %s" % (str(node.account), cmd))
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as
monitor:
@@ -390,12 +687,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
monitor.wait_until("Kafka\s*Server.*started",
timeout_sec=timeout_sec, backoff_sec=.25,
err_msg="Kafka server didn't finish startup in
%d seconds" % timeout_sec)
- # Credentials for inter-broker communication are created before
starting Kafka.
- # Client credentials are created after starting Kafka so that both
loading of
- # existing credentials from ZK and dynamic update of credentials in
Kafka are tested.
- # We use the admin client and connect as the broker user when creating
the client (non-broker) credentials
- # if Kafka supports KIP-554, otherwise we use ZooKeeper.
- self.maybe_setup_client_scram_credentials(node)
+ if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO:
SCRAM currently unsupported for controller quorum
+ # Credentials for inter-broker communication are created before
starting Kafka.
+ # Client credentials are created after starting Kafka so that both
loading of
+ # existing credentials from ZK and dynamic update of credentials
in Kafka are tested.
+ # We use the admin client and connect as the broker user when
creating the client (non-broker) credentials
+ # if Kafka supports KIP-554, otherwise we use ZooKeeper.
+ self.maybe_setup_client_scram_credentials(node)
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0:
@@ -448,6 +746,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT,
allow_fail=False)
def kafka_topics_cmd_with_optional_security_settings(self, node,
force_use_zk_connection, kafka_security_protocol = None):
+ if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
+ raise Exception("Must invoke kafka-topics against a broker, not a
Raft controller")
if force_use_zk_connection:
bootstrap_server_or_zookeeper = "--zookeeper %s" %
(self.zk_connect_setting())
skip_optional_security_settings = True
@@ -485,6 +785,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
bootstrap_server_or_zookeeper, optional_command_config_suffix)
def kafka_configs_cmd_with_optional_security_settings(self, node,
force_use_zk_connection, kafka_security_protocol = None):
+ if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
+ raise Exception("Must invoke kafka-configs against a broker, not a
Raft controller")
if force_use_zk_connection:
# kafka-configs supports a TLS config file, so include it if there
is one
bootstrap_server_or_zookeeper = "--zookeeper %s %s" %
(self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
@@ -719,6 +1021,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
node.account.ssh(cmd)
def kafka_acls_cmd_with_optional_security_settings(self, node,
force_use_zk_connection, kafka_security_protocol = None,
override_command_config = None):
+ if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
+ raise Exception("Must invoke kafka-acls against a broker, not a
Raft controller")
force_use_zk_connection = force_use_zk_connection or not
self.all_nodes_acl_command_supports_bootstrap_server
if force_use_zk_connection:
bootstrap_server_or_authorizer_zk_props = "--authorizer-properties
zookeeper.connect=%s" % (self.zk_connect_setting())
@@ -913,6 +1217,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
return missing
def restart_cluster(self, clean_shutdown=True, timeout_sec=60,
after_each_broker_restart=None, *args):
+ # We do not restart the remote controller quorum if it exists.
+ # This is not widely used -- it typically appears in rolling upgrade
tests --
+ # so we will let tests explicitly decide if/when to restart any remote
controller quorum.
for node in self.nodes:
self.restart_node(node, clean_shutdown=clean_shutdown,
timeout_sec=timeout_sec)
if after_each_broker_restart is not None:
@@ -1021,6 +1328,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
def cluster_id(self):
""" Get the current cluster id
"""
+ if self.quorum_info.using_raft:
+ return config_property.CLUSTER_ID
+
self.logger.debug("Querying ZooKeeper to retrieve cluster id")
cluster = self.zk.query("/cluster/id", chroot=self.zk_chroot)
@@ -1031,6 +1341,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
raise
def topic_id(self, topic):
+ if self.quorum_info.using_raft:
+ raise Exception("Not yet implemented: Cannot obtain topic ID
information when using Raft instead of ZooKeeper")
self.logger.debug(
"Querying zookeeper to find assigned topic ID for topic %s." %
topic)
zk_path = "/brokers/topics/%s" % topic
@@ -1107,6 +1419,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
return output
def zk_connect_setting(self):
+ if self.quorum_info.using_raft:
+ raise Exception("No zookeeper connect string available when using
Raft instead of ZooKeeper")
return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
@@ -1130,6 +1444,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
def controller(self):
""" Get the controller node
"""
+ if self.quorum_info.using_raft:
+ raise Exception("Cannot obtain Controller node when using Raft
instead of ZooKeeper")
self.logger.debug("Querying zookeeper to find controller broker")
controller_info = self.zk.query("/controller", chroot=self.zk_chroot)
@@ -1147,6 +1463,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
"""
Check whether a broker is registered in Zookeeper
"""
+ if self.quorum_info.using_raft:
+ raise Exception("Cannot obtain broker registration information
when using Raft instead of ZooKeeper")
self.logger.debug("Querying zookeeper to see if broker %s is
registered", str(node))
broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node),
chroot=self.zk_chroot)
self.logger.debug("Broker info: %s", broker_info)
diff --git a/tests/kafkatest/services/kafka/quorum.py
b/tests/kafkatest/services/kafka/quorum.py
new file mode 100644
index 0000000..7153463
--- /dev/null
+++ b/tests/kafkatest/services/kafka/quorum.py
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# the types of metadata quorums we support
+zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s)
+colocated_raft = 'COLOCATED_RAFT' # co-located KIP-500 Controllers, used
during/after the KIP-500 bridge release(s)
+remote_raft = 'REMOTE_RAFT' # separate KIP-500 Controllers, used during/after
the KIP-500 bridge release(s)
+
+# How we will parameterize tests that exercise all quorum styles
+# [“ZK”, “REMOTE_RAFT”, "COLOCATED_RAFT"] during the KIP-500 bridge
release(s)
+# [“REMOTE_RAFT”, "COLOCATED_RAFT”] after the KIP-500 bridge release(s)
+all = [zk, remote_raft, colocated_raft]
+# How we will parameterize tests that exercise all Raft quorum styles
+all_raft = [remote_raft, colocated_raft]
+# How we will parameterize tests that are unrelated to upgrades:
+# [“ZK”] before the KIP-500 bridge release(s)
+# [“ZK”, “REMOTE_RAFT”] during the KIP-500 bridge release(s) and in preview
releases
+# [“REMOTE_RAFT”] after the KIP-500 bridge release(s)
+all_non_upgrade = [zk, remote_raft]
+
+def for_test(test_context):
+ # A test uses ZooKeeper if it doesn't specify a metadata quorum or if it
explicitly specifies ZooKeeper
+ default_quorum_type = zk
+ arg_name = 'metadata_quorum'
+ retval = default_quorum_type if not test_context.injected_args else
test_context.injected_args.get(arg_name, default_quorum_type)
+ if retval not in all:
+ raise Exception("Unknown %s value provided for the test: %s" %
(arg_name, retval))
+ return retval
+
+class ServiceQuorumInfo:
+ """
+ Exposes quorum-related information for a KafkaService
+
+ Kafka can use either ZooKeeper or a Raft Controller quorum for its
+ metadata. Raft Controllers can either be co-located with Kafka in
+ the same JVM or remote in separate JVMs. The choice is made via
+ the 'metadata_quorum' parameter defined for the system test: if it
+ is not explicitly defined, or if it is set to 'ZK', then ZooKeeper
+ is used. If it is explicitly set to 'COLOCATED_RAFT' then Raft
+ controllers will be co-located with the brokers; the value
+ `REMOTE_RAFT` indicates remote controllers.
+
+ Attributes
+ ----------
+
+ kafka : KafkaService
+ The service for which this instance exposes quorum-related
+ information
+ quorum_type : str
+ COLOCATED_RAFT, REMOTE_RAFT, or ZK
+ using_zk : bool
+ True iff quorum_type==ZK
+ using_raft : bool
+ False iff quorum_type==ZK
+ has_brokers : bool
+ Whether there is at least one node with process.roles
+ containing 'broker'. True iff using_raft and the Kafka
+ service doesn't itself have a remote Kafka service (meaning
+ it is not a remote controller quorum).
+ has_controllers : bool
+ Whether there is at least one node with process.roles
+ containing 'controller'. True iff quorum_type ==
+ COLOCATED_RAFT or the Kafka service itself has a remote Kafka
+ service (meaning it is a remote controller quorum).
+ has_brokers_and_controllers :
+ True iff quorum_type==COLOCATED_RAFT
+ """
+
+ def __init__(self, kafka, context):
+ """
+
+ :param kafka : KafkaService
+ The service for which this instance exposes quorum-related
+ information
+ :param context : TestContext
+ The test context within which the this instance and the
+ given Kafka service is being instantiated
+ """
+
+ quorum_type = for_test(context)
+ if quorum_type != zk and kafka.zk:
+ raise Exception("Cannot use ZooKeeper while specifying a Raft
metadata quorum (should not happen)")
+ if kafka.remote_kafka and quorum_type != remote_raft:
+ raise Exception("Cannot specify a remote Kafka service unless
using a remote Raft metadata quorum (should not happen)")
+ self.kafka = kafka
+ self.quorum_type = quorum_type
+ self.using_zk = quorum_type == zk
+ self.using_raft = not self.using_zk
+ self.has_brokers = self.using_raft and not kafka.remote_kafka
+ self.has_controllers = quorum_type == colocated_raft or
kafka.remote_kafka
+ self.has_brokers_and_controllers = quorum_type == colocated_raft
+
+class NodeQuorumInfo:
+ """
+ Exposes quorum-related information for a node in a KafkaService
+
+ Attributes
+ ----------
+ service_quorum_info : ServiceQuorumInfo
+ The quorum information about the service to which the node
+ belongs
+ has_broker_role : bool
+ True iff using_raft and the Kafka service doesn't itself have
+ a remote Kafka service (meaning it is not a remote controller)
+ has_controller_role : bool
+ True iff quorum_type==COLOCATED_RAFT and the node is one of
+ the first N in the cluster where N is the number of nodes
+ that have a controller role; or the Kafka service itself has a
+ remote Kafka service (meaning it is a remote controller
+ quorum).
+ has_combined_broker_and_controller_roles :
+ True iff has_broker_role==True and has_controller_role==true
+ """
+
+ def __init__(self, service_quorum_info, node):
+ """
+ :param service_quorum_info : ServiceQuorumInfo
+ The quorum information about the service to which the node
+ belongs
+ :param node : Node
+ The particular node for which this information applies.
+ In the co-located case, whether or not a node's broker's
+ process.roles contains 'controller' may vary based on the
+ particular node if the number of controller nodes is less
+ than the number of nodes in the service.
+ """
+
+ self.service_quorum_info = service_quorum_info
+ self.has_broker_role = self.service_quorum_info.has_brokers
+ idx = self.service_quorum_info.kafka.nodes.index(node)
+ self.has_controller_role =
self.service_quorum_info.kafka.num_nodes_controller_role > idx
+ self.has_combined_broker_and_controller_roles = self.has_broker_role
and self.has_controller_role
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 96acc2c..d7fa2d2 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -14,18 +14,36 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
-advertised.host.name={{ node.account.hostname }}
+{% if quorum_info.using_raft %}
+# The role(s) of this server. Setting this puts us in Raft metadata quorm mode
+{% if node_quorum_info.has_combined_broker_and_controller_roles %}
+process.roles=broker,controller
+{% elif node_quorum_info.has_controller_role %}
+process.roles=controller
+{% else %}
+process.roles=broker
+{% endif %}
+# The connect string for the controller quorum
+controller.quorum.voters={{ controller_quorum_voters }}
+controller.listener.names={{ controller_listener_names }}
+
+{% endif %}
listeners={{ listeners }}
-advertised.listeners={{ advertised_listeners }}
+
listener.security.protocol.map={{ listener_security_protocol_map }}
+{% if quorum_info.using_zk or quorum_info.has_brokers %}
+advertised.host.name={{ node.account.hostname }}
+advertised.listeners={{ advertised_listeners }}
+
{% if node.version.supports_named_listeners() %}
inter.broker.listener.name={{ interbroker_listener.name }}
{% else %}
security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
{% endif %}
+{% endif %}
{% for k, v in listener_security_config.client_listener_overrides.items() %}
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
@@ -35,6 +53,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
{% endif %}
{% endfor %}
+{% if quorum_info.using_zk or quorum_info.has_brokers %}
{% if interbroker_listener.name != security_protocol %}
{% for k, v in listener_security_config.interbroker_listener_overrides.items()
%}
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
@@ -44,6 +63,8 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k
}}={{ v }}
{% endif %}
{% endfor %}
{% endif %}
+{% endif %}
+
{% if security_config.tls_version is not none %}
ssl.enabled.protocols={{ security_config.tls_version }}
ssl.protocol={{ security_config.tls_version }}
@@ -56,6 +77,8 @@ ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
+
+{% if quorum_info.using_zk %}
# Zookeeper TLS settings
#
# Note that zookeeper.ssl.client.enable will be set to true or false
elsewhere, as appropriate.
@@ -67,8 +90,11 @@ zookeeper.ssl.keystore.password=test-ks-passwd
{% endif %}
zookeeper.ssl.truststore.location=/mnt/security/test.truststore.jks
zookeeper.ssl.truststore.password=test-ts-passwd
+{% endif %}
#
+{% if quorum_info.using_zk or quorum_info.has_brokers %}
sasl.mechanism.inter.broker.protocol={{
security_config.interbroker_sasl_mechanism }}
+{% endif %}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka
{% if authorizer_class_name is not none %}
@@ -76,10 +102,12 @@ ssl.client.auth=required
authorizer.class.name={{ authorizer_class_name }}
{% endif %}
+{% if quorum_info.using_zk %}
zookeeper.set.acl={{"true" if zk_set_acl else "false"}}
zookeeper.connection.timeout.ms={{ zk_connect_timeout }}
zookeeper.session.timeout.ms={{ zk_session_timeout }}
+{% endif %}
{% if replica_lag is defined %}
replica.lag.time.max.ms={{replica_lag}}