This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new edbe3f3fe [KYUUBI #6681] Log the delete batch request in batch 
operation log
edbe3f3fe is described below

commit edbe3f3fefc75532ed4cc44269adf55eca6a5a9e
Author: Wang, Fei <fwan...@ebay.com>
AuthorDate: Mon Sep 9 09:28:44 2024 -0700

    [KYUUBI #6681] Log the delete batch request in batch operation log
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    As title, log the delete batch request in operation log.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please 
also include relevant motivation and context. List any dependencies that are 
required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6681 from turboFei/audit_kill.
    
    Closes #6681
    
    8550868a6 [Wang, Fei] withOperationLog
    
    Authored-by: Wang, Fei <fwan...@ebay.com>
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
---
 .../org/apache/spark/kyuubi/SQLOperationListener.scala | 18 ++++--------------
 .../apache/kyuubi/operation/AbstractOperation.scala    |  9 +++++++++
 .../scala/org/apache/kyuubi/operation/Operation.scala  |  1 +
 .../apache/kyuubi/server/api/v1/BatchesResource.scala  |  7 ++++++-
 4 files changed, 20 insertions(+), 15 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index c89c50aa9..7fd22c53f 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -34,7 +34,6 @@ import 
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, 
SPARK_SQL_EXECUTION_ID_KEY}
 import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
 import org.apache.kyuubi.operation.Operation
-import org.apache.kyuubi.operation.log.OperationLog
 
 /**
  * A [[SparkListener]] based on spark's DeveloperApi [[StatsReportListener]], 
used to appending
@@ -78,15 +77,6 @@ class SQLOperationListener(
     properties != null && properties.getProperty(KYUUBI_STATEMENT_ID_KEY) == 
operationId
   }
 
-  private def withOperationLog(f: => Unit): Unit = {
-    try {
-      operation.getOperationLog.foreach(OperationLog.setCurrentOperationLog)
-      f
-    } finally {
-      OperationLog.removeCurrentOperationLog()
-    }
-  }
-
   override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
     if (sameGroupId(jobStart.properties)) {
       val jobId = jobStart.jobId
@@ -105,7 +95,7 @@ class SQLOperationListener(
       activeJobs.put(
         jobId,
         new SparkJobInfo(stageSize, stageIds))
-      withOperationLog {
+      operation.withOperationLog {
         info(s"Query [$operationId]: Job $jobId started with $stageSize 
stages," +
           s" ${activeJobs.size()} active jobs running")
       }
@@ -119,7 +109,7 @@ class SQLOperationListener(
         case JobSucceeded => "succeeded"
         case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
       }
-      withOperationLog {
+      operation.withOperationLog {
         info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} 
active jobs running")
       }
     }
@@ -135,7 +125,7 @@ class SQLOperationListener(
         activeStages.put(
           stageAttempt,
           new SparkStageInfo(stageId, stageInfo.numTasks))
-        withOperationLog {
+        operation.withOperationLog {
           info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " 
+
             s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active 
stages running")
         }
@@ -166,7 +156,7 @@ class SQLOperationListener(
           operationRunTime.getAndAdd(taskMetrics.executorRunTime)
           operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
         }
-        withOperationLog(super.onStageCompleted(stageCompleted))
+        operation.withOperationLog(super.onStageCompleted(stageCompleted))
       }
     }
   }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 3681e98f7..fdcc0c340 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -74,6 +74,15 @@ abstract class AbstractOperation(session: Session) extends 
Operation with Loggin
 
   override def getOperationLog: Option[OperationLog] = None
 
+  override def withOperationLog(f: => Unit): Unit = {
+    try {
+      getOperationLog.foreach(OperationLog.setCurrentOperationLog)
+      f
+    } finally {
+      OperationLog.removeCurrentOperationLog()
+    }
+  }
+
   OperationAuditLogger.audit(this, OperationState.INITIALIZED)
   @volatile protected var state: OperationState = INITIALIZED
   @volatile protected var startTime: Long = _
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
index e21638518..9a6e453d5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
@@ -37,6 +37,7 @@ trait Operation {
   def getHandle: OperationHandle
   def getStatus: OperationStatus
   def getOperationLog: Option[OperationLog]
+  def withOperationLog(f: => Unit): Unit
 
   def getBackgroundHandle: Future[_]
   def shouldRunAsync: Boolean
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 3fd5ddbea..be0d3bdf6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -499,9 +499,14 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
 
     val sessionHandle = formatSessionHandle(batchId)
     sessionManager.getBatchSession(sessionHandle).map { batchSession =>
-      fe.getSessionUser(batchSession.user)
+      val userName = fe.getSessionUser(batchSession.user)
+      val ipAddress = fe.getIpAddress
       sessionManager.closeSession(batchSession.handle)
       val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
+      batchSession.batchJobSubmissionOp.withOperationLog {
+        warn(s"Received kill batch request from $userName/$ipAddress")
+        warn(s"Kill batch response: killed: $killed, msg: $msg.")
+      }
       new CloseBatchResponse(killed, msg)
     }.getOrElse {
       sessionManager.getBatchMetadata(batchId).map { metadata =>

Reply via email to