This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ff7f42386 [KYUUBI #4204] Trace the connection metrics with session type
ff7f42386 is described below

commit ff7f42386a58042748a7c42e3017d66e82af0b81
Author: fwang12 <[email protected]>
AuthorDate: Sun Jan 29 11:03:29 2023 +0800

    [KYUUBI #4204] Trace the connection metrics with session type
    
    ### _Why are the changes needed?_
    
    Trace connection metrics with session type.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4204 from turboFei/conn_type_open.
    
    Closes #4204
    
    67f45fd5d [fwang12] add ut
    ef0bed665 [fwang12] save
    ed4be0233 [fwang12] align
    b29396306 [fwang12] order
    94cdc5bfc [fwang12] order
    91bee9082 [fwang12] docs
    37128668a [fwang12] save
    6cef34f10 [fwang12] session type
    6a73760a8 [fwang12] reuse
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 docs/monitor/metrics.md                            |  3 ++
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    | 10 ++-----
 .../org/apache/kyuubi/session/KyuubiSession.scala  | 15 ++++++++++
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 10 ++-----
 .../kyuubi/session/KyuubiSessionManager.scala      |  2 ++
 .../KyuubiOperationPerConnectionSuite.scala        | 35 +++++++++++++++++++++-
 6 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/docs/monitor/metrics.md b/docs/monitor/metrics.md
index 2f2ac1405..1c8013b58 100644
--- a/docs/monitor/metrics.md
+++ b/docs/monitor/metrics.md
@@ -45,10 +45,13 @@ Metrics Prefix | Metrics Suffix | Type | Since | Description
 `kyuubi.exec.pool.threads.alive`  | | gauge | 1.2.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'> threads keepAlive in the 
backend executive thread pool</div>
 `kyuubi.exec.pool.threads.active` | | gauge | 1.2.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'> threads active in the backend 
executive thread pool</div>
 `kyuubi.connection.total`   | | counter | 1.2.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'>  cumulative connection 
count</div>
+`kyuubi.connection.total`   | `${sessionType}` | counter | 1.7.0 |<div 
style='width: 150pt;word-wrap: break-word;white-space: normal'> cumulative 
connection count with session type `${sessionType}`</div>
 `kyuubi.connection.opened`  | | gauge | 1.2.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'> current active connection 
count</div>
 `kyuubi.connection.opened`  | `${user}` | counter | 1.2.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'> current active connections 
count requested by a `${user}`</div>
+`kyuubi.connection.opened`  | `${sessionType}` | counter | 1.7.0 |<div 
style='width: 150pt;word-wrap: break-word;white-space: normal'> current active 
connections count with session type `${sessionType}`</div>
 `kyuubi.connection.failed`  | | counter | 1.2.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'>  cumulative failed connection 
count</div>
 `kyuubi.connection.failed`  | `${user}` | counter | 1.2.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'> cumulative failed connections 
for a `${user}`</div>
+`kyuubi.connection.failed`  | `${sessionType}` | counter | 1.7.0 |<div 
style='width: 150pt;word-wrap: break-word;white-space: normal'> cumulative 
failed connection count with session type `${sessionType}`</div>
 `kyuubi.operation.total`    | | counter | 1.5.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'>  cumulative opened operation 
count</div>
 `kyuubi.operation.total`    | `${operationType}` | counter | 1.5.0 |<div 
style='width: 150pt;word-wrap: break-word;white-space: normal'>  cumulative 
opened count for the operation `${operationType}`</div>
 `kyuubi.operation.opened`   | | gauge | 1.5.0 |<div style='width: 
150pt;word-wrap: break-word;white-space: normal'>  current opened operation 
count</div>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 2e718f6bf..5ed851ce0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -21,7 +21,6 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 
-import com.codahale.metrics.MetricRegistry
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.client.api.v1.dto.BatchRequest
@@ -29,8 +28,6 @@ import org.apache.kyuubi.config.{KyuubiConf, 
KyuubiReservedKeys}
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
-import org.apache.kyuubi.metrics.MetricsConstants.{CONN_OPEN, CONN_TOTAL}
-import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.SessionType.SessionType
@@ -129,10 +126,7 @@ class KyuubiBatchSessionImpl(
   }
 
   override def open(): Unit = handleSessionException {
-    MetricsSystem.tracing { ms =>
-      ms.incCount(CONN_TOTAL)
-      ms.incCount(MetricRegistry.name(CONN_OPEN, user))
-    }
+    traceMetricsOnOpen()
 
     if (recoveryMetadata.isEmpty) {
       val metaData = Metadata(
@@ -177,6 +171,6 @@ class KyuubiBatchSessionImpl(
     waitMetadataRequestsRetryCompletion()
     sessionEvent.endTime = System.currentTimeMillis()
     EventBus.post(sessionEvent)
-    MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
+    traceMetricsOnClose()
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
index e2c692820..5b731e273 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
@@ -16,10 +16,13 @@
  */
 package org.apache.kyuubi.session
 
+import com.codahale.metrics.MetricRegistry
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import 
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_CONNECTION_URL_KEY, 
KYUUBI_SESSION_REAL_USER_KEY}
 import org.apache.kyuubi.events.KyuubiSessionEvent
+import org.apache.kyuubi.metrics.MetricsConstants.{CONN_OPEN, CONN_TOTAL}
+import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.session.SessionType.SessionType
 
 abstract class KyuubiSession(
@@ -50,4 +53,16 @@ abstract class KyuubiSession(
         throw t
     }
   }
+
+  protected def traceMetricsOnOpen(): Unit = MetricsSystem.tracing { ms =>
+    ms.incCount(CONN_TOTAL)
+    ms.incCount(MetricRegistry.name(CONN_TOTAL, sessionType.toString))
+    ms.incCount(MetricRegistry.name(CONN_OPEN, user))
+    ms.incCount(MetricRegistry.name(CONN_OPEN, sessionType.toString))
+  }
+
+  protected def traceMetricsOnClose(): Unit = MetricsSystem.tracing { ms =>
+    ms.decCount(MetricRegistry.name(CONN_OPEN, user))
+    ms.decCount(MetricRegistry.name(CONN_OPEN, sessionType.toString))
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 93e6e9858..536034b9c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -21,7 +21,6 @@ import java.util.Base64
 
 import scala.collection.JavaConverters._
 
-import com.codahale.metrics.MetricRegistry
 import org.apache.hive.service.rpc.thrift._
 
 import org.apache.kyuubi.KyuubiSQLException
@@ -32,8 +31,6 @@ import 
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KE
 import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager}
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
-import org.apache.kyuubi.metrics.MetricsConstants._
-import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.service.authentication.InternalSecurityAccessor
@@ -109,10 +106,7 @@ class KyuubiSessionImpl(
   private var _engineSessionHandle: SessionHandle = _
 
   override def open(): Unit = handleSessionException {
-    MetricsSystem.tracing { ms =>
-      ms.incCount(CONN_TOTAL)
-      ms.incCount(MetricRegistry.name(CONN_OPEN, user))
-    }
+    traceMetricsOnOpen()
 
     checkSessionAccessPathURIs()
 
@@ -256,7 +250,7 @@ class KyuubiSessionImpl(
       if (engine != null) engine.close()
       sessionEvent.endTime = System.currentTimeMillis()
       EventBus.post(sessionEvent)
-      MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
+      traceMetricsOnClose()
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index f4b12f386..54d5b8b24 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -107,6 +107,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
         MetricsSystem.tracing { ms =>
           ms.incCount(CONN_FAIL)
           ms.incCount(MetricRegistry.name(CONN_FAIL, user))
+          ms.incCount(MetricRegistry.name(CONN_FAIL, 
SessionType.INTERACTIVE.toString))
         }
         throw KyuubiSQLException(
           s"Error opening session for $username client ip $ipAddress, due to 
${e.getMessage}",
@@ -168,6 +169,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
         MetricsSystem.tracing { ms =>
           ms.incCount(CONN_FAIL)
           ms.incCount(MetricRegistry.name(CONN_FAIL, user))
+          ms.incCount(MetricRegistry.name(CONN_FAIL, 
SessionType.BATCH.toString))
         }
         throw KyuubiSQLException(
           s"Error opening batch session[$handle] for $user client ip 
$ipAddress," +
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 4c4faf63b..341f00639 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -32,8 +32,9 @@ import 
org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
 import org.apache.kyuubi.engine.ApplicationState
 import org.apache.kyuubi.jdbc.KyuubiHiveDriver
 import org.apache.kyuubi.jdbc.hive.KyuubiConnection
+import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.plugin.SessionConfAdvisor
-import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
 
 /**
  * UT with Connection level engine shared cost much time, only run basic jdbc 
tests.
@@ -239,6 +240,38 @@ class KyuubiOperationPerConnectionSuite extends 
WithKyuubiServer with HiveJDBCTe
       }
     }
   }
+
+  test("trace the connection metrics with session type") {
+    val connOpenMetric = 
s"${MetricsConstants.CONN_OPEN}.${SessionType.INTERACTIVE}"
+    val connTotalMetric = 
s"${MetricsConstants.CONN_TOTAL}.${SessionType.INTERACTIVE}"
+    val connFailedMetric = 
s"${MetricsConstants.CONN_FAIL}.${SessionType.INTERACTIVE}"
+    val connTotalCount = 
MetricsSystem.counterValue(connTotalMetric).getOrElse(0L)
+    val connFailedCount = 
MetricsSystem.counterValue(connFailedMetric).getOrElse(0L)
+
+    withJdbcStatement() { statement =>
+      statement.executeQuery("select engine_name()")
+    }
+    eventually(timeout(5.seconds), interval(100.milliseconds)) {
+      assert(MetricsSystem.counterValue(connTotalMetric).getOrElse(0L) > 
connTotalCount)
+      assert(MetricsSystem.counterValue(connOpenMetric).getOrElse(0L) === 0)
+    }
+
+    withSessionConf(Map.empty)(Map.empty)(Map(
+      KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false",
+      "spark.master" -> "invalid")) {
+      intercept[Exception] {
+        withJdbcStatement() { statement =>
+          statement.executeQuery("select engine_name()")
+        }
+      }
+    }
+
+    eventually(timeout(5.seconds), interval(100.milliseconds)) {
+      assert(MetricsSystem.counterValue(connTotalMetric).getOrElse(0L) - 
connTotalCount > 1)
+      assert(MetricsSystem.counterValue(connOpenMetric).getOrElse(0L) === 0)
+      assert(MetricsSystem.counterValue(connFailedMetric).getOrElse(0L) > 
connFailedCount)
+    }
+  }
 }
 
 class TestSessionConfAdvisor extends SessionConfAdvisor {

Reply via email to