This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ff7f42386 [KYUUBI #4204] Trace the connection metrics with session type
ff7f42386 is described below
commit ff7f42386a58042748a7c42e3017d66e82af0b81
Author: fwang12 <[email protected]>
AuthorDate: Sun Jan 29 11:03:29 2023 +0800
[KYUUBI #4204] Trace the connection metrics with session type
### _Why are the changes needed?_
Trace connection metrics with session type.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4204 from turboFei/conn_type_open.
Closes #4204
67f45fd5d [fwang12] add ut
ef0bed665 [fwang12] save
ed4be0233 [fwang12] align
b29396306 [fwang12] order
94cdc5bfc [fwang12] order
91bee9082 [fwang12] docs
37128668a [fwang12] save
6cef34f10 [fwang12] session type
6a73760a8 [fwang12] reuse
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
docs/monitor/metrics.md | 3 ++
.../kyuubi/session/KyuubiBatchSessionImpl.scala | 10 ++-----
.../org/apache/kyuubi/session/KyuubiSession.scala | 15 ++++++++++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 10 ++-----
.../kyuubi/session/KyuubiSessionManager.scala | 2 ++
.../KyuubiOperationPerConnectionSuite.scala | 35 +++++++++++++++++++++-
6 files changed, 58 insertions(+), 17 deletions(-)
diff --git a/docs/monitor/metrics.md b/docs/monitor/metrics.md
index 2f2ac1405..1c8013b58 100644
--- a/docs/monitor/metrics.md
+++ b/docs/monitor/metrics.md
@@ -45,10 +45,13 @@ Metrics Prefix | Metrics Suffix | Type | Since | Description
`kyuubi.exec.pool.threads.alive` | | gauge | 1.2.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> threads keepAlive in the
backend executive thread pool</div>
`kyuubi.exec.pool.threads.active` | | gauge | 1.2.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> threads active in the backend
executive thread pool</div>
`kyuubi.connection.total` | | counter | 1.2.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> cumulative connection
count</div>
+`kyuubi.connection.total` | `${sessionType}` | counter | 1.7.0 |<div
style='width: 150pt;word-wrap: break-word;white-space: normal'> cumulative
connection count with session type `${sessionType}`</div>
`kyuubi.connection.opened` | | gauge | 1.2.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> current active connection
count</div>
`kyuubi.connection.opened` | `${user}` | counter | 1.2.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> current active connections
count requested by a `${user}`</div>
+`kyuubi.connection.opened` | `${sessionType}` | counter | 1.7.0 |<div
style='width: 150pt;word-wrap: break-word;white-space: normal'> current active
connections count with session type `${sessionType}`</div>
`kyuubi.connection.failed` | | counter | 1.2.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> cumulative failed connection
count</div>
`kyuubi.connection.failed` | `${user}` | counter | 1.2.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> cumulative failed connections
for a `${user}`</div>
+`kyuubi.connection.failed` | `${sessionType}` | counter | 1.7.0 |<div
style='width: 150pt;word-wrap: break-word;white-space: normal'> cumulative
failed connection count with session type `${sessionType}`</div>
`kyuubi.operation.total` | | counter | 1.5.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> cumulative opened operation
count</div>
`kyuubi.operation.total` | `${operationType}` | counter | 1.5.0 |<div
style='width: 150pt;word-wrap: break-word;white-space: normal'> cumulative
opened count for the operation `${operationType}`</div>
`kyuubi.operation.opened` | | gauge | 1.5.0 |<div style='width:
150pt;word-wrap: break-word;white-space: normal'> current opened operation
count</div>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 2e718f6bf..5ed851ce0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -21,7 +21,6 @@ import java.util.UUID
import scala.collection.JavaConverters._
-import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.client.api.v1.dto.BatchRequest
@@ -29,8 +28,6 @@ import org.apache.kyuubi.config.{KyuubiConf,
KyuubiReservedKeys}
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
-import org.apache.kyuubi.metrics.MetricsConstants.{CONN_OPEN, CONN_TOTAL}
-import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.SessionType.SessionType
@@ -129,10 +126,7 @@ class KyuubiBatchSessionImpl(
}
override def open(): Unit = handleSessionException {
- MetricsSystem.tracing { ms =>
- ms.incCount(CONN_TOTAL)
- ms.incCount(MetricRegistry.name(CONN_OPEN, user))
- }
+ traceMetricsOnOpen()
if (recoveryMetadata.isEmpty) {
val metaData = Metadata(
@@ -177,6 +171,6 @@ class KyuubiBatchSessionImpl(
waitMetadataRequestsRetryCompletion()
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
- MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
+ traceMetricsOnClose()
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
index e2c692820..5b731e273 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
@@ -16,10 +16,13 @@
*/
package org.apache.kyuubi.session
+import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_CONNECTION_URL_KEY,
KYUUBI_SESSION_REAL_USER_KEY}
import org.apache.kyuubi.events.KyuubiSessionEvent
+import org.apache.kyuubi.metrics.MetricsConstants.{CONN_OPEN, CONN_TOTAL}
+import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.session.SessionType.SessionType
abstract class KyuubiSession(
@@ -50,4 +53,16 @@ abstract class KyuubiSession(
throw t
}
}
+
+ protected def traceMetricsOnOpen(): Unit = MetricsSystem.tracing { ms =>
+ ms.incCount(CONN_TOTAL)
+ ms.incCount(MetricRegistry.name(CONN_TOTAL, sessionType.toString))
+ ms.incCount(MetricRegistry.name(CONN_OPEN, user))
+ ms.incCount(MetricRegistry.name(CONN_OPEN, sessionType.toString))
+ }
+
+ protected def traceMetricsOnClose(): Unit = MetricsSystem.tracing { ms =>
+ ms.decCount(MetricRegistry.name(CONN_OPEN, user))
+ ms.decCount(MetricRegistry.name(CONN_OPEN, sessionType.toString))
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 93e6e9858..536034b9c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -21,7 +21,6 @@ import java.util.Base64
import scala.collection.JavaConverters._
-import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiSQLException
@@ -32,8 +31,6 @@ import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KE
import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
-import org.apache.kyuubi.metrics.MetricsConstants._
-import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.service.authentication.InternalSecurityAccessor
@@ -109,10 +106,7 @@ class KyuubiSessionImpl(
private var _engineSessionHandle: SessionHandle = _
override def open(): Unit = handleSessionException {
- MetricsSystem.tracing { ms =>
- ms.incCount(CONN_TOTAL)
- ms.incCount(MetricRegistry.name(CONN_OPEN, user))
- }
+ traceMetricsOnOpen()
checkSessionAccessPathURIs()
@@ -256,7 +250,7 @@ class KyuubiSessionImpl(
if (engine != null) engine.close()
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
- MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
+ traceMetricsOnClose()
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index f4b12f386..54d5b8b24 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -107,6 +107,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
MetricsSystem.tracing { ms =>
ms.incCount(CONN_FAIL)
ms.incCount(MetricRegistry.name(CONN_FAIL, user))
+ ms.incCount(MetricRegistry.name(CONN_FAIL,
SessionType.INTERACTIVE.toString))
}
throw KyuubiSQLException(
s"Error opening session for $username client ip $ipAddress, due to
${e.getMessage}",
@@ -168,6 +169,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
MetricsSystem.tracing { ms =>
ms.incCount(CONN_FAIL)
ms.incCount(MetricRegistry.name(CONN_FAIL, user))
+ ms.incCount(MetricRegistry.name(CONN_FAIL,
SessionType.BATCH.toString))
}
throw KyuubiSQLException(
s"Error opening batch session[$handle] for $user client ip
$ipAddress," +
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 4c4faf63b..341f00639 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -32,8 +32,9 @@ import
org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
import org.apache.kyuubi.jdbc.hive.KyuubiConnection
+import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.plugin.SessionConfAdvisor
-import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
/**
* UT with Connection level engine shared cost much time, only run basic jdbc
tests.
@@ -239,6 +240,38 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
}
}
}
+
+ test("trace the connection metrics with session type") {
+ val connOpenMetric =
s"${MetricsConstants.CONN_OPEN}.${SessionType.INTERACTIVE}"
+ val connTotalMetric =
s"${MetricsConstants.CONN_TOTAL}.${SessionType.INTERACTIVE}"
+ val connFailedMetric =
s"${MetricsConstants.CONN_FAIL}.${SessionType.INTERACTIVE}"
+ val connTotalCount =
MetricsSystem.counterValue(connTotalMetric).getOrElse(0L)
+ val connFailedCount =
MetricsSystem.counterValue(connFailedMetric).getOrElse(0L)
+
+ withJdbcStatement() { statement =>
+ statement.executeQuery("select engine_name()")
+ }
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ assert(MetricsSystem.counterValue(connTotalMetric).getOrElse(0L) >
connTotalCount)
+ assert(MetricsSystem.counterValue(connOpenMetric).getOrElse(0L) === 0)
+ }
+
+ withSessionConf(Map.empty)(Map.empty)(Map(
+ KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false",
+ "spark.master" -> "invalid")) {
+ intercept[Exception] {
+ withJdbcStatement() { statement =>
+ statement.executeQuery("select engine_name()")
+ }
+ }
+ }
+
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ assert(MetricsSystem.counterValue(connTotalMetric).getOrElse(0L) -
connTotalCount > 1)
+ assert(MetricsSystem.counterValue(connOpenMetric).getOrElse(0L) === 0)
+ assert(MetricsSystem.counterValue(connFailedMetric).getOrElse(0L) >
connFailedCount)
+ }
+ }
}
class TestSessionConfAdvisor extends SessionConfAdvisor {