This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new dba9e223b [KYUUBI #2301] Limit the maximum number of concurrent
connections per user and ipaddress
dba9e223b is described below
commit dba9e223bd7554f1f1b189edb32c1f9c75367e85
Author: wforget <[email protected]>
AuthorDate: Tue Apr 19 12:08:08 2022 +0800
[KYUUBI #2301] Limit the maximum number of concurrent connections per user
and ipaddress
### _Why are the changes needed?_
close #2301
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2364 from wForget/KYUUBI-2301.
Closes #2301
7a176e0e [wforget] merge
0b41fafa [wforget] comment
7614dbda [wforget] Merge remote-tracking branch 'origin/master' into
KYUUBI-2301
9180cf63 [wforget] fix
741139a4 [wforget] fix
1194ccbf [wforget] move limiter from common to server
a6b93b38 [wforget] regenerate settings.md
123f9209 [wforget] [KYUUBI-2301] Limit the maximum number of concurrent
connections per user and ipaddress
Authored-by: wforget <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
docs/deployment/settings.md | 3 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 24 +++++
.../kyuubi/session/KyuubiSessionManager.scala | 19 ++++
.../org/apache/kyuubi/session/SessionLimiter.scala | 110 +++++++++++++++++++++
.../kyuubi/session/SessionLimiterSuite.scala | 99 +++++++++++++++++++
5 files changed, 255 insertions(+)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 10a6cc1c2..edf8b833f 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -331,6 +331,9 @@ Key | Default | Meaning | Type | Since
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
+<code>kyuubi.server.limit.connections.per.ipaddress</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Maximum kyuubi
server connections per ipaddress. Any user exceeding this limit will not be
allowed to connect.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.6.0</div>
+<code>kyuubi.server.limit.connections.per.user</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Maximum kyuubi
server connections per user. Any user exceeding this limit will not be allowed
to connect.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.6.0</div>
+<code>kyuubi.server.limit.connections.per.user.ipaddress</code>|<div
style='width: 65pt;word-wrap: break-word;white-space:
normal'><undefined></div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Maximum kyuubi server connections per
user:ipaddress combination. Any user-ipaddress exceeding this limit will not be
allowed to connect.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.6.0</div>
<code>kyuubi.server.name</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>The name of Kyuubi
Server.</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.5.0</div>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 4b36d1263..7cb48b0a9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1411,4 +1411,28 @@ object KyuubiConf {
.version("1.6.0")
.stringConf
.createOptional
+
+ val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
+ buildConf("kyuubi.server.limit.connections.per.user")
+ .doc("Maximum kyuubi server connections per user." +
+ " Any user exceeding this limit will not be allowed to connect.")
+ .version("1.6.0")
+ .intConf
+ .createOptional
+
+ val SERVER_LIMIT_CONNECTIONS_PER_IPADDRESS: OptionalConfigEntry[Int] =
+ buildConf("kyuubi.server.limit.connections.per.ipaddress")
+ .doc("Maximum kyuubi server connections per ipaddress." +
+ " Any user exceeding this limit will not be allowed to connect.")
+ .version("1.6.0")
+ .intConf
+ .createOptional
+
+ val SERVER_LIMIT_CONNECTIONS_PER_USER_IPADDRESS: OptionalConfigEntry[Int] =
+ buildConf("kyuubi.server.limit.connections.per.user.ipaddress")
+ .doc("Maximum kyuubi server connections per user:ipaddress combination."
+
+ " Any user-ipaddress exceeding this limit will not be allowed to
connect.")
+ .version("1.6.0")
+ .intConf
+ .createOptional
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 982c14ca3..404a2d4e0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -43,10 +43,13 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
// this lazy is must be specified since the conf is null when the class
initialization
lazy val sessionConfAdvisor: SessionConfAdvisor =
PluginLoader.loadSessionConfAdvisor(conf)
+ private var limiter: Option[SessionLimiter] = None
+
override def initialize(conf: KyuubiConf): Unit = {
addService(credentialsManager)
val absPath =
Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
_operationLogRoot = Some(absPath.toAbsolutePath.toString)
+ initSessionLimiter(conf)
super.initialize(conf)
}
@@ -75,6 +78,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+ limiter.foreach(_.increment(UserIpAddress(username, ipAddress)))
try {
super.openSession(protocol, username, password, ipAddress, conf)
} catch {
@@ -89,6 +93,12 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
}
}
+ override def closeSession(sessionHandle: SessionHandle): Unit = {
+ val session = getSession(sessionHandle)
+ super.closeSession(sessionHandle)
+ limiter.foreach(_.decrement(UserIpAddress(session.user,
session.ipAddress)))
+ }
+
def openBatchSession(
protocol: TProtocolVersion,
user: String,
@@ -146,6 +156,15 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
}
override protected def isServer: Boolean = true
+
+ private def initSessionLimiter(conf: KyuubiConf): Unit = {
+ val userLimit = conf.get(SERVER_LIMIT_CONNECTIONS_PER_USER).getOrElse(0)
+ val ipAddressLimit =
conf.get(SERVER_LIMIT_CONNECTIONS_PER_IPADDRESS).getOrElse(0)
+ val userIpAddressLimit =
conf.get(SERVER_LIMIT_CONNECTIONS_PER_USER_IPADDRESS).getOrElse(0)
+ if (userLimit > 0 || ipAddressLimit > 0 || userIpAddressLimit > 0) {
+ limiter = Some(SessionLimiter(userLimit, ipAddressLimit,
userIpAddressLimit))
+ }
+ }
}
object KyuubiSessionManager {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
new file mode 100644
index 000000000..d104b23c8
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.session
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.KyuubiSQLException
+
+trait SessionLimiter {
+ def increment(userIpAddress: UserIpAddress): Unit
+ def decrement(userIpAddress: UserIpAddress): Unit
+}
+
+case class UserIpAddress(user: String, ipAddress: String)
+
+class SessionLimiterImpl(userLimit: Int, ipAddressLimit: Int,
userIpAddressLimit: Int)
+ extends SessionLimiter {
+
+ private val _counters: java.util.Map[String, AtomicInteger] =
+ new ConcurrentHashMap[String, AtomicInteger]()
+
+ private[session] def counters(): java.util.Map[String, AtomicInteger] =
_counters
+
+ override def increment(userIpAddress: UserIpAddress): Unit = {
+ val user = userIpAddress.user
+ val ipAddress = userIpAddress.ipAddress
+ // increment userIpAddress count
+ if (userIpAddressLimit > 0 && StringUtils.isNotBlank(user) &&
+ StringUtils.isNotBlank(ipAddress)) {
+ incrLimitCount(
+ s"$user:$ipAddress",
+ userIpAddressLimit,
+ "Connection limit per user:ipaddress reached" +
+ s" (user:ipaddress: $user:$ipAddress limit: $userIpAddressLimit)")
+ }
+ // increment user count
+ if (userLimit > 0 && StringUtils.isNotBlank(user)) {
+ incrLimitCount(
+ user,
+ userLimit,
+ s"Connection limit per user reached (user: $user limit: $userLimit)")
+ }
+ // increment ipAddress count
+ if (ipAddressLimit > 0 && StringUtils.isNotBlank(ipAddress)) {
+ incrLimitCount(
+ ipAddress,
+ ipAddressLimit,
+ s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit:
$ipAddressLimit)")
+ }
+ }
+
+ override def decrement(userIpAddress: UserIpAddress): Unit = {
+ val user = userIpAddress.user
+ val ipAddress = userIpAddress.ipAddress
+ // decrement user count
+ if (userLimit > 0 && StringUtils.isNotBlank(user)) {
+ decrLimitCount(user)
+ }
+ // decrement ipAddress count
+ if (ipAddressLimit > 0 && StringUtils.isNotBlank(ipAddress)) {
+ decrLimitCount(ipAddress)
+ }
+ // decrement userIpAddress count
+ if (userIpAddressLimit > 0 && StringUtils.isNotBlank(user) &&
+ StringUtils.isNotBlank(ipAddress)) {
+ decrLimitCount(s"$user:$ipAddress")
+ }
+ }
+
+ private def incrLimitCount(key: String, limit: Int, errorMsg: String): Unit
= {
+ val count = _counters.computeIfAbsent(key, _ => new AtomicInteger())
+ if (count.incrementAndGet() > limit) {
+ count.decrementAndGet()
+ throw KyuubiSQLException(errorMsg)
+ }
+ }
+
+ private def decrLimitCount(key: String): Unit = {
+ _counters.get(key) match {
+ case count: AtomicInteger => count.decrementAndGet()
+ case _ =>
+ }
+ }
+}
+
+object SessionLimiter {
+
+ def apply(userLimit: Int, ipAddressLimit: Int, userIpAddressLimit: Int):
SessionLimiter = {
+ new SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit)
+ }
+
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
new file mode 100644
index 000000000..d2df573e1
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.session
+
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.LongAdder
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
+
+class SessionLimiterSuite extends KyuubiFunSuite {
+
+ test("test increment session limit") {
+ val user = "user001"
+ val ipAddress = "127.0.0.1"
+ val userLimit = 30
+ val ipAddressLimit = 20
+ val userIpAddressLimit = 10
+ val threadPool = Executors.newFixedThreadPool(10)
+ def checkLimit(
+ userIpAddress: UserIpAddress,
+ expectedIndex: Int,
+ expectedErrorMsg: String): Unit = {
+ val limiter = SessionLimiter(userLimit, ipAddressLimit,
userIpAddressLimit)
+ val successAdder = new LongAdder
+ val expectedErrorAdder = new LongAdder
+ val count = 50
+ val latch = new CountDownLatch(count)
+ for (i <- 0 until count) {
+ threadPool.execute(() => {
+ try {
+ limiter.increment(userIpAddress)
+ successAdder.increment()
+ } catch {
+ case e: KyuubiSQLException if e.getMessage === expectedErrorMsg =>
+ expectedErrorAdder.increment()
+ case _: Throwable =>
+ } finally {
+ latch.countDown()
+ }
+ })
+ }
+ latch.await()
+ assert(successAdder.intValue() == expectedIndex)
+ assert(expectedErrorAdder.intValue() == count - expectedIndex)
+ }
+
+ // user limit
+ checkLimit(
+ UserIpAddress(user, null),
+ userLimit,
+ s"Connection limit per user reached (user: $user limit: $userLimit)")
+
+ // ipAddress limit
+ checkLimit(
+ UserIpAddress(null, ipAddress),
+ ipAddressLimit,
+ s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit:
$ipAddressLimit)")
+
+ // userIpAddress limit
+ checkLimit(
+ UserIpAddress(user, ipAddress),
+ userIpAddressLimit,
+ s"Connection limit per user:ipaddress reached" +
+ s" (user:ipaddress: $user:$ipAddress limit: $userIpAddressLimit)")
+ threadPool.shutdown()
+ }
+
+ test("test increment and decrement session") {
+ val user = "user001"
+ val ipAddress = "127.0.0.1"
+ val userLimit = 30
+ val ipAddressLimit = 20
+ val userIpAddressLimit = 10
+ val limiter = SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit)
+ for (i <- 0 until 50) {
+ val userIpAddress = UserIpAddress(user, ipAddress)
+ limiter.increment(userIpAddress)
+ limiter.decrement(userIpAddress)
+ }
+ limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
+ .foreach(c => assert(c.get() == 0))
+ }
+}