This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new de7ad07dd [Bugfix] Task state changes should before engine state 
changes (#4775)
de7ad07dd is described below

commit de7ad07ddfe15bca10712abbd81e37d29f5f30dd
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Sat Jul 15 23:52:03 2023 +0800

    [Bugfix] Task state changes should before engine state changes (#4775)
    
    this close #4774
---
 .../executor/execute/ComputationExecutor.scala     | 19 ++++++--
 .../service/TaskExecutionServiceImpl.scala         | 10 +---
 .../executor/lock/EngineConnTimedLock.scala        | 55 ++++++++++------------
 .../service/EngineConnTimedLockService.scala       |  4 +-
 4 files changed, 41 insertions(+), 47 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
index 98a6c2b21..940973be6 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
@@ -21,7 +21,10 @@ import org.apache.linkis.DataWorkCloudApplication
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{Logging, Utils}
 import 
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
-import 
org.apache.linkis.engineconn.acessible.executor.listener.event.TaskStatusChangedEvent
+import org.apache.linkis.engineconn.acessible.executor.listener.event.{
+  TaskResponseErrorEvent,
+  TaskStatusChangedEvent
+}
 import org.apache.linkis.engineconn.common.conf.{EngineConnConf, 
EngineConnConstant}
 import 
org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
 import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
@@ -237,11 +240,9 @@ abstract class ComputationExecutor(val outputPrintLimit: 
Int = 1000)
       response = response match {
         case _: OutputExecuteResponse =>
           succeedTasks.increase()
-          transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
           SuccessExecuteResponse()
         case s: SuccessExecuteResponse =>
           succeedTasks.increase()
-          transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
           s
         case _ => response
       }
@@ -261,7 +262,17 @@ abstract class ComputationExecutor(val outputPrintLimit: 
Int = 1000)
     taskCache.put(engineConnTask.getTaskId, engineConnTask)
     lastTask = engineConnTask
     val response = ensureOp {
-      toExecuteTask(engineConnTask)
+      val executeResponse = toExecuteTask(engineConnTask)
+      executeResponse match {
+        case successExecuteResponse: SuccessExecuteResponse =>
+          transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
+        case errorExecuteResponse: ErrorExecuteResponse =>
+          listenerBusContext.getEngineConnSyncListenerBus.postToAll(
+            TaskResponseErrorEvent(engineConnTask.getTaskId, 
errorExecuteResponse.message)
+          )
+          transformTaskStatus(engineConnTask, ExecutionNodeStatus.Failed)
+      }
+      executeResponse
     }
 
     Utils.tryAndWarn(afterExecute(engineConnTask, response))
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 93d607c25..bc738d549 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -400,15 +400,7 @@ class TaskExecutionServiceImpl
     Utils.tryFinally {
       val jobId = JobUtils.getJobIdFromMap(task.getProperties)
       LoggerUtils.setJobIdMDC(jobId)
-      val response = executor.execute(task)
-      response match {
-        case ErrorExecuteResponse(message, throwable) =>
-          sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
-          logger.error(message, throwable)
-          LogHelper.pushAllRemainLogs()
-          executor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
-        case _ => logger.warn(s"task get response is $response")
-      }
+      executor.execute(task)
       clearCache(task.getTaskId)
     } {
       LoggerUtils.removeJobIdMDC()
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
index 84ab6fb7c..af4d1eb01 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
@@ -43,12 +43,10 @@ class EngineConnTimedLock(private var timeout: Long)
   val releaseScheduler = new ScheduledThreadPoolExecutor(1)
   var releaseTask: ScheduledFuture[_] = null
   var lastLockTime: Long = 0
-  var lockedBy: AccessibleExecutor = null
 
   override def acquire(executor: AccessibleExecutor): Unit = {
     lock.acquire()
     lastLockTime = System.currentTimeMillis()
-    lockedBy = executor
     scheduleTimeout
   }
 
@@ -58,8 +56,6 @@ class EngineConnTimedLock(private var timeout: Long)
     logger.debug("try to lock for succeed is  " + succeed.toString)
     if (succeed) {
       lastLockTime = System.currentTimeMillis()
-      lockedBy = executor
-      logger.debug("try to lock for add time out task ! Locked by thread : " + 
lockedBy.getId)
       scheduleTimeout
     }
     succeed
@@ -68,18 +64,13 @@ class EngineConnTimedLock(private var timeout: Long)
   // Unlock callback is not called in release method, because release method 
is called actively
   override def release(): Unit = {
     logger.debug(
-      "try to release for lock," + lockedBy + ",current thread " + 
Thread.currentThread().getName
+      s"try to release for lock: ${lock.toString}, current thread " + 
Thread.currentThread().getName
     )
-    if (lockedBy != null) {
-      // && lockedBy == Thread.currentThread()   Inconsistent thread(线程不一致)
-      logger.debug("try to release for lockedBy and thread ")
-      if (releaseTask != null) {
-        releaseTask.cancel(true)
-        releaseTask = null
-      }
-      logger.debug("try to release for lock release success")
-      lockedBy = null
+    if (releaseTask != null) {
+      releaseTask.cancel(true)
+      releaseTask = null
     }
+    logger.debug("try to release for lock release success")
     unlockCallback(lock.toString)
     resetLock()
   }
@@ -97,7 +88,6 @@ class EngineConnTimedLock(private var timeout: Long)
         releaseScheduler.purge()
       }
       lock.release()
-      lockedBy = null
     }
     resetLock()
   }
@@ -109,13 +99,18 @@ class EngineConnTimedLock(private var timeout: Long)
           new Runnable {
             override def run(): Unit = {
               synchronized {
-                if (isAcquired() && NodeStatus.Idle == lockedBy.getStatus && 
isExpired()) {
-                  // unlockCallback depends on lockedBy, so lockedBy cannot be 
set null before unlockCallback
-                  logger.info(s"Lock : [${lock.toString} was released due to 
timeout.")
-                  release()
-                } else if (isAcquired() && NodeStatus.Busy == 
lockedBy.getStatus) {
-                  lastLockTime = System.currentTimeMillis()
-                  logger.info("Update lastLockTime because executor is busy.")
+                ExecutorManager.getInstance.getReportExecutor match {
+                  case reportExecutor: AccessibleExecutor =>
+                    if (
+                        isAcquired() && NodeStatus.Idle == 
reportExecutor.getStatus && isExpired()
+                    ) {
+                      // unlockCallback depends on lockedBy, so lockedBy 
cannot be set null before unlockCallback
+                      logger.info(s"Lock : [${lock.toString} was released due 
to timeout.")
+                      release()
+                    } else if (isAcquired() && NodeStatus.Busy == 
reportExecutor.getStatus) {
+                      lastLockTime = System.currentTimeMillis()
+                      logger.info("Update lastLockTime because executor is 
busy.")
+                    }
                 }
               }
             }
@@ -144,14 +139,12 @@ class EngineConnTimedLock(private var timeout: Long)
   }
 
   override def renew(): Boolean = {
-    if (lockedBy != null) {
-      if (isAcquired && releaseTask != null) {
-        if (releaseTask.cancel(false)) {
-          releaseScheduler.purge()
-          scheduleTimeout
-          lastLockTime = System.currentTimeMillis()
-          return true
-        }
+    if (isAcquired && releaseTask != null) {
+      if (releaseTask.cancel(false)) {
+        releaseScheduler.purge()
+        scheduleTimeout
+        lastLockTime = System.currentTimeMillis()
+        return true
       }
     }
     false
@@ -195,7 +188,7 @@ class EngineConnTimedLock(private var timeout: Long)
     ExecutorListenerBusContext
       .getExecutorListenerBusContext()
       .getEngineConnAsyncListenerBus
-      .post(ExecutorUnLockEvent(null, lockStr.toString))
+      .post(ExecutorUnLockEvent(null, lockStr))
   }
 
   override def onExecutorCreated(executorCreateEvent: ExecutorCreateEvent): 
Unit = {}
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
index 957a03b6f..21325f42b 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
@@ -161,9 +161,7 @@ class EngineConnTimedLockService extends LockService with 
Logging {
         .toString
     )
     if (isLockExist(lock)) {
-      logger.info(
-        s"try to unlock lockEntity : 
lockString=$lockString,lockedBy=${engineConnLock.lockedBy.getId}"
-      )
+      logger.info(s"try to unlock lockEntity : lockString=$lockString")
       engineConnLock.release()
       this.lockString = null
       true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to