This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4942a5347 [KYUUBI #4035][FOLLOWUP] Add eventEnabled method for
KyuubiOperation and SparkOperation
4942a5347 is described below
commit 4942a53470a403b814f27deb99c470d8f8644dd0
Author: fwang12 <[email protected]>
AuthorDate: Fri Dec 30 19:53:34 2022 +0800
[KYUUBI #4035][FOLLOWUP] Add eventEnabled method for KyuubiOperation and
SparkOperation
### _Why are the changes needed?_
Followup for #4035
Add `eventEnabled` method for KyuubiOperation and SparkOperation.
For `KyuubiOperation`, enable below operations as before
- ExecuteStatement
- BatchJobSubmission
For `SparkOperation`, disable
- GetTypeInfo
- because it is used by `KyuubiConnection::isValid` and might be called
with interval to check session alive
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4046 from turboFei/post_event.
Closes #4035
d558d7706 [fwang12] event enabled
4926c8b56 [fwang12] event enabled
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala | 1 +
.../org/apache/kyuubi/engine/spark/operation/SparkOperation.scala | 8 ++++++--
.../scala/org/apache/kyuubi/operation/BatchJobSubmission.scala | 5 ++---
.../main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 8 +-------
.../main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala | 6 ++++++
5 files changed, 16 insertions(+), 12 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
index f24735cbf..6df89df34 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
@@ -27,6 +27,7 @@ import
org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTypeInfo(session: Session) extends SparkOperation(session) {
+ override protected def eventEnabled: Boolean = false
override protected def resultSchema: StructType = {
new StructType()
.add(TYPE_NAME, "string", nullable = false, "Type name")
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 428d7dcaf..67ab29172 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -120,11 +120,15 @@ abstract class SparkOperation(session: Session)
s"spark.${SESSION_USER_SIGN_ENABLED.key}",
SESSION_USER_SIGN_ENABLED.defaultVal.get)
- EventBus.post(SparkOperationEvent(this))
+ protected def eventEnabled: Boolean = true
+
+ if (eventEnabled) EventBus.post(SparkOperationEvent(this))
override protected def setState(newState: OperationState): Unit = {
super.setState(newState)
- EventBus.post(SparkOperationEvent(this,
operationListener.flatMap(_.getExecutionId)))
+ if (eventEnabled) {
+ EventBus.post(SparkOperationEvent(this,
operationListener.flatMap(_.getExecutionId)))
+ }
}
protected def setSparkLocalProperty: (String, String) => Unit =
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e0eb7c05d..9e77fa5b8 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -29,7 +29,6 @@ import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState,
KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
-import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -61,7 +60,6 @@ class BatchJobSubmission(
recoveryMetadata: Option[Metadata])
extends KyuubiApplicationOperation(session) {
import BatchJobSubmission._
- EventBus.post(KyuubiOperationEvent(this))
override def shouldRunAsync: Boolean = true
@@ -151,7 +149,6 @@ class BatchJobSubmission(
if (newState == RUNNING) {
session.onEngineOpened()
}
- EventBus.post(KyuubiOperationEvent(this))
}
}
@@ -345,6 +342,8 @@ class BatchJobSubmission(
}
override def isTimedOut: Boolean = false
+
+ override protected def eventEnabled: Boolean = true
}
object BatchJobSubmission {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 3827a2577..4e818355e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -24,9 +24,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
-import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -37,7 +35,6 @@ class ExecuteStatement(
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(session) {
- EventBus.post(KyuubiOperationEvent(this))
final private val _operationLog: OperationLog =
if (shouldRunAsync) {
@@ -162,8 +159,5 @@ class ExecuteStatement(
if (!shouldRunAsync) getBackgroundHandle.get()
}
- override def setState(newState: OperationState): Unit = {
- super.setState(newState)
- EventBus.post(KyuubiOperationEvent(this))
- }
+ override protected def eventEnabled: Boolean = true
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 3a0e738d0..638985ea1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -26,6 +26,7 @@ import org.apache.thrift.TException
import org.apache.thrift.transport.TTransportException
import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
import org.apache.kyuubi.metrics.MetricsConstants.{OPERATION_FAIL,
OPERATION_OPEN, OPERATION_STATE, OPERATION_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -169,6 +170,10 @@ abstract class KyuubiOperation(session: Session) extends
AbstractOperation(sessi
override def shouldRunAsync: Boolean = false
+ protected def eventEnabled: Boolean = false
+
+ if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
+
override def setState(newState: OperationState): Unit = {
MetricsSystem.tracing { ms =>
ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType,
state.toString.toLowerCase), -1)
@@ -176,5 +181,6 @@ abstract class KyuubiOperation(session: Session) extends
AbstractOperation(sessi
ms.markMeter(MetricRegistry.name(OPERATION_STATE,
newState.toString.toLowerCase))
}
super.setState(newState)
+ if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
}
}