hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r758818012



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2014,72 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      // validations for all 3 KRaft setups (co-located, controller-only, 
broker-only)
+      val addressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+      if (addressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+      require(controlPlaneListenerName.isEmpty,

Review comment:
       Is it worth adding a comment to this message about the controller 
listener effectively superseding this config?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2014,72 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      // validations for all 3 KRaft setups (co-located, controller-only, 
broker-only)
+      val addressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+      if (addressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // validations for KRaft controller-only setup
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // validations for both KRaft broker setup (i.e. broker-only and 
co-located)
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")

Review comment:
       Is it worth clarifying in this comment that clients only send requests 
to brokers?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2014,72 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      // validations for all 3 KRaft setups (co-located, controller-only, 
broker-only)
+      val addressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+      if (addressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // validations for KRaft controller-only setup
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // validations for both KRaft broker setup (i.e. broker-only and 
co-located)
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // validations for both KRaft controller setups (i.e. controller-only 
and co-located)
+        // nodeId must appear in controller.quorum.voters
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        require(addressSpecsByNodeId.get(nodeId) != null,
+          s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, 
the node id $nodeId must be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${addressSpecsByNodeId.asScala.keySet.toSet}")
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+      } else {

Review comment:
       Maybe it's just me, but I find it a bit difficult to follow these 
validations because of the way they are organized. I think it would be easier 
to follow if we organized like this:
   ```scala
   if (processRoles == Set(BrokerRole) {
     // broker only
   } else if (processRoles == Set(ControllerRole) {
     // controller only
   } else {
     // mixed 
   }
   ```
   It might result in some duplication, but perhaps we could have some helpers.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2011,12 +2012,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // has controller role (and optionally broker role as well)
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        // the port appearing in controller.quorum.voters for this node must 
match the port of the first controller listener
+        // (we allow other nodes' voter ports to differ to support running 
multiple controllers on the same host)
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val addressSpecForThisNode = 
RaftConfig.parseVoterConnections(quorumVoters).get(nodeId)
+        addressSpecForThisNode match {
+          case inetAddressSpec: RaftConfig.InetAddressSpec => {
+            val quorumVotersPort = inetAddressSpec.address.getPort
+            require(controllerListeners.head.port == quorumVotersPort,
+              s"Port in ${KafkaConfig.QuorumVotersProp} for this controller 
node (${KafkaConfig.NodeIdProp}=$nodeId, port=$quorumVotersPort) does not match 
the port for the first controller listener in 
${KafkaConfig.ControllerListenerNamesProp} 
(${controllerListeners.head.listenerName.value()}, 
port=${controllerListeners.head.port})")
+          }
+          case _ =>
+        }
+      } else {
+        // only broker role
+        // controller.listener.names must be non-empty
+        // none of them can appear in listeners
+        // warn that only the first one is used if there is more than one
+        require(controllerListenerNames.exists(_.nonEmpty),
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value when running KRaft with just the broker role")
+        if (controllerListenerNames.size > 1) {
+          warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple 
entries; only the first will be used since 
${KafkaConfig.ProcessRolesProp}=broker: $controllerListenerNames")
+        }
+        require(controllerListeners.isEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must not contain a 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running KRaft with just the broker role")
+      }
+    } else {
+      // controller listener names must be empty when not in KRaft mode
+      require(!controllerListenerNames.exists(_.nonEmpty), 
s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in 
KRaft mode: ${controllerListenerNames.asJava.toString}")

Review comment:
       nit: couldn't we use `$controllerListenerNames` as in the message above?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2014,72 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      // validations for all 3 KRaft setups (co-located, controller-only, 
broker-only)
+      val addressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+      if (addressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // validations for KRaft controller-only setup
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")

Review comment:
       Since it's a subtle point, maybe we can add some extra detail in this 
message. Perhaps we can say that controller have no need for advertising the 
listener since the endpoint is explicitly specified through 
controller.quorum.voters?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2014,72 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      // validations for all 3 KRaft setups (co-located, controller-only, 
broker-only)
+      val addressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+      if (addressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // validations for KRaft controller-only setup
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // validations for both KRaft broker setup (i.e. broker-only and 
co-located)
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // validations for both KRaft controller setups (i.e. controller-only 
and co-located)
+        // nodeId must appear in controller.quorum.voters
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        require(addressSpecsByNodeId.get(nodeId) != null,

Review comment:
       nit: `addressSpecsByNodeId.containsKey(nodeId)`? This would mirror the 
check we use below.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -748,7 +749,8 @@ object KafkaConfig {
     "Different security (SSL and SASL) settings can be configured for each 
listener by adding a normalised " +
     "prefix (the listener name is lowercased) to the config name. For example, 
to set a different keystore for the " +
     "INTERNAL listener, a config with name 
<code>listener.name.internal.ssl.keystore.location</code> would be set. " +
-    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). "
+    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). " +
+    "Note that in KRaft an additional default mapping CONTROLLER to PLAINTEXT 
is added."

Review comment:
       Hmm, why do we need this? It seems strange to assume that PLAINTEXT is 
intended if there are multiple listeners defined. Why don't we force the user 
to explicitly define the controller listener?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1960,9 +1962,15 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   }
 
   def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+    val explicitMap = getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
       .map { case (listenerName, protocolName) =>
-      ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+        ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+      }
+    val controllerListenerName = new ListenerName("CONTROLLER")

Review comment:
       This is confusing. We add the "CONTROLLER" mapping even if the user has 
a controller listener name defined?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2007,12 +2008,47 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      if (processRoles.contains(ControllerRole)) {
+        // has controller role (and optionally broker role as well)
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        // each port appearing in controller.quorum.voters must match the port 
in exactly one controller listener
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        RaftConfig.parseVoterConnections(quorumVoters).asScala.foreach { case 
(nodeId, addressSpec) =>
+          addressSpec match {
+            case inetAddressSpec: RaftConfig.InetAddressSpec => {
+              val quorumVotersPort = inetAddressSpec.address.getPort
+              val controllerListenersWithSamePort = 
controllerListeners.filter(_.port == quorumVotersPort)
+              require(controllerListenersWithSamePort.size == 1,
+                s"Port in ${KafkaConfig.QuorumVotersProp} for controller node 
with ${KafkaConfig.NodeIdProp}=$nodeId ($quorumVotersPort) does not match the 
port for any controller listener in ${KafkaConfig.ControllerListenerNamesProp}")
+            }
+            case _ =>
+          }
+        }
+      } else {
+        // only broker role
+        // controller.listener.names must be non-empty
+        // none of them can appear in listeners
+        require(controllerListenerNames.exists(_.nonEmpty),
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value when running KRaft with just the broker role")
+        require(controllerListeners.isEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must not contain a 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running KRaft with just the broker role")
+      }
+    }
 
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
     val listenerNames = listeners.map(_.listenerName).toSet
     if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in ${ControllerListenerNamesProp}?" else ""))

Review comment:
       Hmm, I guess this check was already implied by 
`advertisedListenerNames.contains(interBrokerListenerName)` below for the zk 
broker. Could we move it to the other `BrokerRole` handling above so that all 
the logic is consolidated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to