This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push: new edbe3f3fe [KYUUBI #6681] Log the delete batch request in batch operation log edbe3f3fe is described below commit edbe3f3fefc75532ed4cc44269adf55eca6a5a9e Author: Wang, Fei <fwan...@ebay.com> AuthorDate: Mon Sep 9 09:28:44 2024 -0700 [KYUUBI #6681] Log the delete batch request in batch operation log # :mag: Description ## Issue References ๐ As title, log the delete batch request in operation log. ## Describe Your Solution ๐ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐งช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6681 from turboFei/audit_kill. Closes #6681 8550868a6 [Wang, Fei] withOperationLog Authored-by: Wang, Fei <fwan...@ebay.com> Signed-off-by: Wang, Fei <fwan...@ebay.com> --- .../org/apache/spark/kyuubi/SQLOperationListener.scala | 18 ++++-------------- .../apache/kyuubi/operation/AbstractOperation.scala | 9 +++++++++ .../scala/org/apache/kyuubi/operation/Operation.scala | 1 + .../apache/kyuubi/server/api/v1/BatchesResource.scala | 7 ++++++- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index c89c50aa9..7fd22c53f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -34,7 +34,6 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY} import org.apache.kyuubi.engine.spark.operation.ExecuteStatement import org.apache.kyuubi.operation.Operation -import org.apache.kyuubi.operation.log.OperationLog /** * A [[SparkListener]] based on spark's DeveloperApi [[StatsReportListener]], used to appending @@ -78,15 +77,6 @@ class SQLOperationListener( properties != null && properties.getProperty(KYUUBI_STATEMENT_ID_KEY) == operationId } - private def withOperationLog(f: => Unit): Unit = { - try { - operation.getOperationLog.foreach(OperationLog.setCurrentOperationLog) - f - } finally { - OperationLog.removeCurrentOperationLog() - } - } - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { if (sameGroupId(jobStart.properties)) { val jobId = jobStart.jobId @@ -105,7 +95,7 @@ class SQLOperationListener( activeJobs.put( jobId, new SparkJobInfo(stageSize, stageIds)) - withOperationLog { + operation.withOperationLog { info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + s" ${activeJobs.size()} active jobs running") } @@ -119,7 +109,7 @@ class SQLOperationListener( case JobSucceeded => "succeeded" case _ => "failed" // TODO: Handle JobFailed(exception: Exception) } - withOperationLog { + operation.withOperationLog { info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") } } @@ -135,7 +125,7 @@ class SQLOperationListener( activeStages.put( stageAttempt, new SparkStageInfo(stageId, stageInfo.numTasks)) - withOperationLog { + operation.withOperationLog { info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " + s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active stages running") } @@ -166,7 +156,7 @@ class SQLOperationListener( operationRunTime.getAndAdd(taskMetrics.executorRunTime) operationCpuTime.getAndAdd(taskMetrics.executorCpuTime) } - withOperationLog(super.onStageCompleted(stageCompleted)) + operation.withOperationLog(super.onStageCompleted(stageCompleted)) } } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 3681e98f7..fdcc0c340 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -74,6 +74,15 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin override def getOperationLog: Option[OperationLog] = None + override def withOperationLog(f: => Unit): Unit = { + try { + getOperationLog.foreach(OperationLog.setCurrentOperationLog) + f + } finally { + OperationLog.removeCurrentOperationLog() + } + } + OperationAuditLogger.audit(this, OperationState.INITIALIZED) @volatile protected var state: OperationState = INITIALIZED @volatile protected var startTime: Long = _ diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala index e21638518..9a6e453d5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala @@ -37,6 +37,7 @@ trait Operation { def getHandle: OperationHandle def getStatus: OperationStatus def getOperationLog: Option[OperationLog] + def withOperationLog(f: => Unit): Unit def getBackgroundHandle: Future[_] def shouldRunAsync: Boolean diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 3fd5ddbea..be0d3bdf6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -499,9 +499,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val sessionHandle = formatSessionHandle(batchId) sessionManager.getBatchSession(sessionHandle).map { batchSession => - fe.getSessionUser(batchSession.user) + val userName = fe.getSessionUser(batchSession.user) + val ipAddress = fe.getIpAddress sessionManager.closeSession(batchSession.handle) val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage + batchSession.batchJobSubmissionOp.withOperationLog { + warn(s"Received kill batch request from $userName/$ipAddress") + warn(s"Kill batch response: killed: $killed, msg: $msg.") + } new CloseBatchResponse(killed, msg) }.getOrElse { sessionManager.getBatchMetadata(batchId).map { metadata =>