dajac commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r510873494



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
     // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
     // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == ConnectionRateMetricName &&
+      metricName.group == MetricsGroup &&
+      metricName.tags.containsKey(IpMetricTag)
+    }
+
+    def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+      quotaLimit != metric.config.quota.bound
+    }
+
+    ip match {
+      case Some(addr) =>
+        val address = InetAddress.getByName(addr)
+        maxConnectionRate match {
+          case Some(rate) =>
+            info(s"Updating max connection rate override for $address to 
$rate")
+            connectionRatePerIp.put(address, rate)
+          case None =>
+            info(s"Removing max connection rate override for $address")
+            connectionRatePerIp.remove(address)
+        }
+        updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+      case None =>
+        val newQuota = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+        info(s"Updating default max IP connection rate to $newQuota")
+        defaultConnectionRatePerIp = newQuota
+        val allMetrics = metrics.metrics
+        allMetrics.forEach { (metricName, metric) =>
+          if (isIpConnectionRateMetric(metricName) && 
shouldUpdateQuota(metric, newQuota)) {
+            info(s"Updating existing connection rate sensor for 
${metricName.tags} to $newQuota")
+            metric.config(rateQuotaMetricConfig(newQuota))
+          }

Review comment:
       I wonder if using `newQuota` is correct here. My understanding is that 
`ip == None` means that we update the default quota which is used if there is 
not per ip quota defined. So, we should also check if there is a per ip quota 
defined before overriding it with the new default, isn't it?
   
   I would be great if we could add more unit tests to cover this logic with 
multiple IPs and/or default.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -526,10 +527,14 @@ private[kafka] abstract class 
AbstractServerThread(connectionQuotas: ConnectionQ
     if (channel != null) {
       debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress()}")
       connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
-      CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
-      CoreUtils.swallow(channel.close(), this, Level.ERROR)
+      closeSocket(channel)
     }
   }
+
+  protected def closeSocket(channel: SocketChannel): Unit = {

Review comment:
       nit: Could we make this one private?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1375,6 +1516,45 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a 
listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: 
Long): Unit = {
+    if (!protectedListener(listenerName)) {
+      brokerConnectionRateSensor.record(-1.0, timeMs, false)
+    }
+    maxConnectionsPerListener
+      .get(listenerName)
+      .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the 
connection
+   *
+   * @param address
+   * @param timeMs
+   * @return

Review comment:
       nit: To be completed.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1375,6 +1516,45 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a 
listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: 
Long): Unit = {
+    if (!protectedListener(listenerName)) {
+      brokerConnectionRateSensor.record(-1.0, timeMs, false)
+    }
+    maxConnectionsPerListener
+      .get(listenerName)
+      .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the 
connection

Review comment:
       nit: Add `.` at the end of the sentence.

##########
File path: core/src/main/scala/kafka/zk/AdminZkClient.scala
##########
@@ -384,6 +386,28 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
     changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
   }
 
+  /**
+   * validates the IP configs
+   * @param ip
+   * @param configs

Review comment:
       nit: To be completed.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -409,6 +410,96 @@ class ConnectionQuotasTest {
     verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
   }
 
+  @Test
+  def testIpConnectionRateWhenActualRateBelowLimit(): Unit = {
+    val ipConnectionRateLimit = 30
+    val connCreateIntervalMs = 40 // connection creation rate = 25/sec
+    val props = brokerPropsWithDefaultConnectionLimits
+    val config = KafkaConfig.fromProps(props)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    addListenersAndVerify(config, connectionQuotas)
+    val externalListener = listeners("EXTERNAL")
+    
connectionQuotas.updateIpConnectionRate(Some(externalListener.defaultIp.getHostAddress),
 Some(ipConnectionRateLimit))
+    val numConnections = 200 // should take 8 seconds to create 200 
connections with rate = 25/s
+    // create connections with the rate < ip quota and verify there is no 
throttling
+    var future = executor.submit((() => acceptConnections(connectionQuotas, 
externalListener, numConnections,
+      connCreateIntervalMs)): Runnable)
+    future.get(15, TimeUnit.SECONDS)
+    assertEquals(s"Number of connections on $externalListener:",
+      numConnections, connectionQuotas.get(externalListener.defaultIp))
+
+    val adminListener = listeners("ADMIN")
+    val unthrottledConnectionCreateInterval = 20 // connection creation rate = 
50/s, should take 4s for 200 connections
+    // create connections with an IP with no quota and verify there is no 
throttling
+    future = executor.submit((() => acceptConnections(connectionQuotas, 
adminListener, numConnections,
+      unthrottledConnectionCreateInterval)): Runnable)
+    future.get(10, TimeUnit.SECONDS)
+
+    assertEquals(s"Number of connections on $adminListener:",
+      numConnections, connectionQuotas.get(adminListener.defaultIp))
+
+    // acceptor shouldn't block for IP rate throttling
+    verifyNoBlockedPercentRecordedOnAllListeners()
+  }
+
+  @Test
+  def testIpConnectionRateWhenActualRateAboveLimit(): Unit = {
+    val ipConnectionRateLimit = 20
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec
+    val props = brokerPropsWithDefaultConnectionLimits
+    val config = KafkaConfig.fromProps(props)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    addListenersAndVerify(config, connectionQuotas)
+    val externalListener = listeners("EXTERNAL").listenerName
+    connectionQuotas.updateIpConnectionRate(Some(knownHost.getHostAddress), 
Some(ipConnectionRateLimit))
+    // create connections with the rate > ip quota
+    val numConnections = 200
+    assertTrue("Expected IP to be throttled by overriden IP rate limit", 
acceptConnections(connectionQuotas,
+      externalListener, knownHost, numConnections, connCreateIntervalMs, 
expectIpThrottle = true))
+    assertTrue(s"Number of connections on $externalListener: should be at 
least the IP rate limit",
+      connectionQuotas.get(knownHost) >= ipConnectionRateLimit)
+
+    // verify that default quota applies to IPs without a quota override
+    connectionQuotas.updateIpConnectionRate(None, Some(ipConnectionRateLimit))
+    val adminListener = listeners("ADMIN").listenerName
+    assertTrue("Expected IP to be throttled by default rate limit",
+      acceptConnections(connectionQuotas, adminListener, unknownHost, 
numConnections, connCreateIntervalMs, expectIpThrottle = true))
+    assertTrue(s"Number of connections on $adminListener: should be at least 
the IP rate limit",
+      connectionQuotas.get(unknownHost) >= ipConnectionRateLimit)
+
+    // acceptor shouldn't block for IP rate throttling
+    verifyNoBlockedPercentRecordedOnAllListeners()
+  }
+
+  @Test
+  def testIpConnectionRateWithListenerConnectionRate(): Unit = {
+    val ipConnectionRateLimit = 25
+    val listenerRateLimit = 35
+    val props = brokerPropsWithDefaultConnectionLimits
+    val config = KafkaConfig.fromProps(props)
+    connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+    // with a default per-IP limit of 25 and a listener rate of 30, only one 
IP should be able to saturate their IP rate
+    // limit, the other IP will hit listener rate limits and block
+    connectionQuotas.updateIpConnectionRate(None, Some(ipConnectionRateLimit))
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+    val listener = listeners("EXTERNAL").listenerName
+    // use a small number of connections because a longer-running test will 
have both IPs throttle at different times
+    val numConnections = 35
+    val futures = List(
+      executor.submit((() => acceptConnections(connectionQuotas, listener, 
knownHost, numConnections,
+        0, true)): Callable[Boolean]),
+      executor.submit((() => acceptConnections(connectionQuotas, listener, 
unknownHost, numConnections,
+        0, true)): Callable[Boolean])
+    )
+
+    val ipsThrottledResults = futures.map(_.get(3, TimeUnit.SECONDS))
+    val throttledIps = ipsThrottledResults.filter(identity)
+    // at most one IP should get IP throttled before the acceptor blocks on 
listener quota
+    assertTrue("Expected BlockedPercentMeter metric for EXTERNAL listener to 
be recorded", blockedPercentMeters("EXTERNAL").count() > 0)
+    assertTrue("Expect at most one IP to get throttled", throttledIps.size < 2)
+  }
+

Review comment:
       It would be good to add few basic unit tests to verify the basics. Stuff 
like verifying that the quota is updated in-memory, verifying that the metrics 
are updated correctly (got caught by this recently), etc.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
     // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
     // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required

Review comment:
       nit: "." at the end.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -699,6 +718,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])
+        val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
+        delayQueue += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
+        None
+    }
+  }
+
+  /**
+   * Close sockets for any connections that have been throttled

Review comment:
       nit: `.` at the end.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1196,6 +1240,41 @@ private[kafka] class Processor(val id: Int,
   }
 }
 
+sealed trait ConnectionQuotaEntity {

Review comment:
       nit: Could we add some doc to explain this trait?

##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -880,6 +872,72 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testConnectionRatePerIp(): Unit = {
+    val overrideProps = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 0)
+    overrideProps.remove("max.connections.per.ip")

Review comment:
       nit: Could we use `MaxConnectionsPerIpProp` instead of hardcoding the 
string?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -680,6 +659,46 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     serverChannel
   }
 
+  /**
+   * Listen for new connections and assign accepted connections to processors 
using round-robin

Review comment:
       nit: `.` at the end of the sentence.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1398,33 +1578,33 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
    * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
    * listener or broker-wide, if listener is not provided.
    * @param quotaLimit connection creation rate quota
-   * @param listenerOpt listener name if sensor is for a listener
+   * @param connectionQuotaEntity entity to create the sensor for
    */
-  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
-    val sensorName = listenerOpt.map(listener => 
s"ConnectionAcceptRate-$listener").getOrElse("ConnectionAcceptRate")
-    val sensor = metrics.sensor(sensorName, rateQuotaMetricConfig(quotaLimit))
-    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null)
-    info(s"Created $sensorName sensor, quotaLimit=$quotaLimit")
-    sensor
+  private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, 
connectionQuotaEntity: ConnectionQuotaEntity): Sensor = {
+    sensorAccessor.getOrCreate(
+      connectionQuotaEntity.sensorName,
+      connectionQuotaEntity.sensorExpiration,
+      sensor => sensor.add(connectionRateMetricName(connectionQuotaEntity), 
new Rate, rateQuotaMetricConfig(quotaLimit))
+    )
   }
 
   /**
-   * Updates quota configuration for a given listener or broker-wide (if 
'listenerOpt' is None)
+   * Updates quota configuration for a given connection quota entity
    */
-  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
-    val metric = metrics.metric(connectionRateMetricName(listenerOpt))
-    metric.config(rateQuotaMetricConfig(quotaLimit))
-    info(s"Updated ${listenerOpt.getOrElse("broker-wide")} max connection 
creation rate to $quotaLimit")
+  private def updateConnectionRateQuota(quotaLimit: Int, 
connectionQuotaEntity: ConnectionQuotaEntity): Unit = {
+    val metricOpt = 
Option(metrics.metric(connectionRateMetricName(connectionQuotaEntity)))
+    metricOpt.foreach { metric =>

Review comment:
       nit: `metricOpt` is not really necessary. We could directly put 
`foreach` on the line above.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -600,43 +607,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
     startupComplete()
     try {
-      var currentProcessorIndex = 0
       while (isRunning) {
         try {
-          val ready = nioSelector.select(500)
-          if (ready > 0) {
-            val keys = nioSelector.selectedKeys()
-            val iter = keys.iterator()
-            while (iter.hasNext && isRunning) {
-              try {
-                val key = iter.next
-                iter.remove()
-
-                if (key.isAcceptable) {
-                  accept(key).foreach { socketChannel =>
-                    // Assign the channel to the next processor (using 
round-robin) to which the
-                    // channel can be added without blocking. If 
newConnections queue is full on
-                    // all processors, block until the last one is able to 
accept a connection.
-                    var retriesLeft = synchronized(processors.length)
-                    var processor: Processor = null
-                    do {
-                      retriesLeft -= 1
-                      processor = synchronized {
-                        // adjust the index (if necessary) and retrieve the 
processor atomically for
-                        // correct behaviour in case the number of processors 
is reduced dynamically
-                        currentProcessorIndex = currentProcessorIndex % 
processors.length
-                        processors(currentProcessorIndex)
-                      }
-                      currentProcessorIndex += 1
-                    } while (!assignNewConnection(socketChannel, processor, 
retriesLeft == 0))
-                  }
-                } else
-                  throw new IllegalStateException("Unrecognized key state for 
acceptor thread.")
-              } catch {
-                case e: Throwable => error("Error while accepting connection", 
e)
-              }
-            }
-          }
+          acceptNewConnections()
+          closeThrottledConnections()

Review comment:
       That makes sense.

##########
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
       s"Admin client connection not closed (initial = $initialConnectionCount, 
current = $connectionCount)")
   }
 
+  private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): 
Unit = {
+    adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),

Review comment:
       Ack, thanks.

##########
File path: core/src/main/scala/kafka/zk/AdminZkClient.scala
##########
@@ -384,6 +386,28 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
     changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
   }
 
+  /**
+   * validates the IP configs
+   * @param ip
+   * @param configs
+   */
+  def validateIpConfig(ip: String, configs: Properties): Unit = {
+    if (ip != ConfigEntityName.Default && !Utils.validHostPattern(ip))
+      throw new AdminOperationException(s"IP $ip is not a valid address.")
+    DynamicConfig.Ip.validate(configs)
+  }
+
+  /**
+   * Update the config for an IP. These overrides will be persisted between 
sessions, and will override any default
+   * IP properties.
+   * @param ip
+   * @param configs

Review comment:
       nit: To be completed.

##########
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -222,14 +222,30 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     reconfigureServers(props, perBrokerConfig = true, (plaintextListenerProp, 
newPlaintextRateLimit.toString))
 
     val plaintextFuture = executor.submit((() =>
-      verifyConnectionRate(10, newPlaintextRateLimit, "PLAINTEXT")): Runnable)
+      verifyConnectionRate(10, newPlaintextRateLimit, "PLAINTEXT", 
ignoreIOExceptions = false)): Runnable)
     val externalFuture = executor.submit((() =>
-      verifyConnectionRate(3, listenerConnRateLimit, "EXTERNAL")): Runnable)
+      verifyConnectionRate(3, listenerConnRateLimit, "EXTERNAL", 
ignoreIOExceptions = false)): Runnable)
+
     plaintextFuture.get(40, TimeUnit.SECONDS)
     externalFuture.get(40, TimeUnit.SECONDS)
     waitForConnectionCount(initialConnectionCount)
   }
 
+  @Test

Review comment:
       It would be great if we could add few more tests to check the dynamic 
quotas. For instance, we could check the in-memory quota as well as the quota 
in the metrics for a given IP when there is nothing, when there is a default 
set, when there is a quota for the IP.
   
   We could also add a test to verify that updating the default quota does not 
change the per-ip quota if it is already set.

##########
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
       s"Admin client connection not closed (initial = $initialConnectionCount, 
current = $connectionCount)")
   }
 
+  private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): 
Unit = {
+    adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),
+      CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, 
updatedRate.toString))
+    // use a random throwaway address if ip isn't specified to get the default 
value
+    TestUtils.waitUntilTrue(() => servers.head.socketServer.connectionQuotas.
+      connectionRateForIp(InetAddress.getByName(ip.getOrElse("255.255.3.4"))) 
== updatedRate,

Review comment:
       What is `255.255.3.4`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
       Thanks for the explanation.
   
   I do agree that in the worst case scenario where we would have all the 
connections ready to be unthrottled, the heap based solution would perform 
worse. On the other hand, if there are no connections ready to be unthrottled, 
checking the heap is `O(1)` whereas we would be `O(N)` where `N` is the number 
of throttled connections.
   
   On average, I do believe that the number of connections ready to be 
unthrottled at every run of the acceptor loop will be smaller (or even zero) 
than the total number of throttled connections. Cleaning `M` throttling 
connections with the heap would be `O(M log(N))` which is better than `O(N)` if 
`M < N / log(N)`.
   
   I lean towards using a delay queue at the moment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to