This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4942a5347 [KYUUBI #4035][FOLLOWUP] Add eventEnabled method for 
KyuubiOperation and SparkOperation
4942a5347 is described below

commit 4942a53470a403b814f27deb99c470d8f8644dd0
Author: fwang12 <[email protected]>
AuthorDate: Fri Dec 30 19:53:34 2022 +0800

    [KYUUBI #4035][FOLLOWUP] Add eventEnabled method for KyuubiOperation and 
SparkOperation
    
    ### _Why are the changes needed?_
    
    Followup for #4035
    Add `eventEnabled` method for KyuubiOperation and SparkOperation.
    
    For `KyuubiOperation`, enable below operations as before
    - ExecuteStatement
    - BatchJobSubmission
    
    For `SparkOperation`, disable
    - GetTypeInfo
       -  because it is used by `KyuubiConnection::isValid` and might be called 
with interval to check session alive
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4046 from turboFei/post_event.
    
    Closes #4035
    
    d558d7706 [fwang12] event enabled
    4926c8b56 [fwang12] event enabled
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 .../org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala    | 1 +
 .../org/apache/kyuubi/engine/spark/operation/SparkOperation.scala | 8 ++++++--
 .../scala/org/apache/kyuubi/operation/BatchJobSubmission.scala    | 5 ++---
 .../main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 8 +-------
 .../main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala  | 6 ++++++
 5 files changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
index f24735cbf..6df89df34 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
@@ -27,6 +27,7 @@ import 
org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
 
 class GetTypeInfo(session: Session) extends SparkOperation(session) {
+  override protected def eventEnabled: Boolean = false
   override protected def resultSchema: StructType = {
     new StructType()
       .add(TYPE_NAME, "string", nullable = false, "Type name")
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 428d7dcaf..67ab29172 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -120,11 +120,15 @@ abstract class SparkOperation(session: Session)
     s"spark.${SESSION_USER_SIGN_ENABLED.key}",
     SESSION_USER_SIGN_ENABLED.defaultVal.get)
 
-  EventBus.post(SparkOperationEvent(this))
+  protected def eventEnabled: Boolean = true
+
+  if (eventEnabled) EventBus.post(SparkOperationEvent(this))
 
   override protected def setState(newState: OperationState): Unit = {
     super.setState(newState)
-    EventBus.post(SparkOperationEvent(this, 
operationListener.flatMap(_.getExecutionId)))
+    if (eventEnabled) {
+      EventBus.post(SparkOperationEvent(this, 
operationListener.flatMap(_.getExecutionId)))
+    }
   }
 
   protected def setSparkLocalProperty: (String, String) => Unit =
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e0eb7c05d..9e77fa5b8 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -29,7 +29,6 @@ import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, 
KillResponse, ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
-import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -61,7 +60,6 @@ class BatchJobSubmission(
     recoveryMetadata: Option[Metadata])
   extends KyuubiApplicationOperation(session) {
   import BatchJobSubmission._
-  EventBus.post(KyuubiOperationEvent(this))
 
   override def shouldRunAsync: Boolean = true
 
@@ -151,7 +149,6 @@ class BatchJobSubmission(
       if (newState == RUNNING) {
         session.onEngineOpened()
       }
-      EventBus.post(KyuubiOperationEvent(this))
     }
   }
 
@@ -345,6 +342,8 @@ class BatchJobSubmission(
   }
 
   override def isTimedOut: Boolean = false
+
+  override protected def eventEnabled: Boolean = true
 }
 
 object BatchJobSubmission {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 3827a2577..4e818355e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -24,9 +24,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
 import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
-import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
@@ -37,7 +35,6 @@ class ExecuteStatement(
     override val shouldRunAsync: Boolean,
     queryTimeout: Long)
   extends KyuubiOperation(session) {
-  EventBus.post(KyuubiOperationEvent(this))
 
   final private val _operationLog: OperationLog =
     if (shouldRunAsync) {
@@ -162,8 +159,5 @@ class ExecuteStatement(
     if (!shouldRunAsync) getBackgroundHandle.get()
   }
 
-  override def setState(newState: OperationState): Unit = {
-    super.setState(newState)
-    EventBus.post(KyuubiOperationEvent(this))
-  }
+  override protected def eventEnabled: Boolean = true
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 3a0e738d0..638985ea1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -26,6 +26,7 @@ import org.apache.thrift.TException
 import org.apache.thrift.transport.TTransportException
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
 import org.apache.kyuubi.metrics.MetricsConstants.{OPERATION_FAIL, 
OPERATION_OPEN, OPERATION_STATE, OPERATION_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -169,6 +170,10 @@ abstract class KyuubiOperation(session: Session) extends 
AbstractOperation(sessi
 
   override def shouldRunAsync: Boolean = false
 
+  protected def eventEnabled: Boolean = false
+
+  if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
+
   override def setState(newState: OperationState): Unit = {
     MetricsSystem.tracing { ms =>
       ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, 
state.toString.toLowerCase), -1)
@@ -176,5 +181,6 @@ abstract class KyuubiOperation(session: Session) extends 
AbstractOperation(sessi
       ms.markMeter(MetricRegistry.name(OPERATION_STATE, 
newState.toString.toLowerCase))
     }
     super.setState(newState)
+    if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
   }
 }

Reply via email to