This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 7eea895ed refactor: refact entrance (#4008)
7eea895ed is described below

commit 7eea895ed76f4704e091cd5e69724aa684b4ad4a
Author: Jack Xu <[email protected]>
AuthorDate: Thu Dec 15 20:33:52 2022 +0800

    refactor: refact entrance (#4008)
---
 .../apache/linkis/scheduler/SchedulerContext.scala |  6 ----
 .../queue/fifoqueue/FIFOUserConsumer.scala         |  2 +-
 .../linkis/scheduler/SchedulerContextTest.scala    |  4 ++-
 .../entrance/restful/EntranceRestfulApi.java       | 14 ++++----
 .../entrance/server/DefaultEntranceServer.java     |  6 ++--
 .../apache/linkis/entrance/EntranceServer.scala    | 20 +++++------
 .../linkis/entrance/EntranceWebSocketService.scala | 31 ++++++++---------
 .../linkis/entrance/cs/CSEntranceHelper.scala      | 39 +++++++++++-----------
 .../entrance/execute/EntranceExecutorManager.scala | 19 +++--------
 .../entrance/parser/CommonEntranceParser.scala     |  6 ++--
 .../impl/value/object/CSFlowInfosSerializer.java   |  5 +--
 11 files changed, 68 insertions(+), 84 deletions(-)

diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
index 833750f57..35c6ff259 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
@@ -21,7 +21,6 @@ import org.apache.linkis.common.listener.ListenerEventBus
 import org.apache.linkis.scheduler.event.{ScheduleEvent, 
SchedulerEventListener}
 import org.apache.linkis.scheduler.executer.ExecutorManager
 import org.apache.linkis.scheduler.queue.{ConsumerManager, GroupFactory}
-import org.apache.linkis.scheduler.queue.fifoqueue.FIFOSchedulerContextImpl
 
 trait SchedulerContext {
 
@@ -35,8 +34,3 @@ trait SchedulerContext {
       : ListenerEventBus[_ <: SchedulerEventListener, _ <: ScheduleEvent]
 
 }
-
-object SchedulerContext {
-  val schedulerContext: SchedulerContext = new FIFOSchedulerContextImpl(100)
-  def getSchedulerContext: SchedulerContext = schedulerContext
-}
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
index 926d7ba17..dcfbbfee6 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
@@ -108,7 +108,7 @@ class FIFOUserConsumer(
     }
     var event: Option[SchedulerEvent] = getWaitForRetryEvent
     if (event.isEmpty) {
-      val completedNums = runningJobs.filter(e => e == null || e.isCompleted)
+      val completedNums = runningJobs.filter(job => job == null || 
job.isCompleted)
       if (completedNums.length < 1) {
         Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化
         return
diff --git 
a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
 
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
index 1a6462cd4..42d62b241 100644
--- 
a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
+++ 
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.linkis.scheduler
 
+import org.apache.linkis.scheduler.queue.fifoqueue.FIFOSchedulerContextImpl
+
 import org.junit.jupiter.api.Assertions.assertNotNull
 import org.junit.jupiter.api.Test
 
@@ -24,7 +26,7 @@ class SchedulerContextTest {
 
   @Test
   def testSchedulerContext: Unit = {
-    val schedulerContext = SchedulerContext
+    val schedulerContext = new FIFOSchedulerContextImpl(100);
     assertNotNull(schedulerContext)
   }
 
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
 
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 33ab1e97a..324187fc2 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -98,15 +98,14 @@ public class EntranceRestfulApi implements 
EntranceRestfulRemote {
     logger.info("Begin to get an execID");
     json.put(TaskConstant.EXECUTE_USER, ModuleUserUtils.getOperationUser(req));
     json.put(TaskConstant.SUBMIT_USER, SecurityFilter.getLoginUsername(req));
-    HashMap<String, String> map = (HashMap) json.get(TaskConstant.SOURCE);
+    HashMap<String, String> map = (HashMap<String, String>) 
json.get(TaskConstant.SOURCE);
     if (map == null) {
       map = new HashMap<>();
       json.put(TaskConstant.SOURCE, map);
     }
     String ip = JobHistoryHelper.getRequestIpAddr(req);
     map.put(TaskConstant.REQUEST_IP, ip);
-    String jobId = entranceServer.execute(json);
-    Job job = entranceServer.getJob(jobId).get();
+    Job job = entranceServer.execute(json);
     JobRequest jobReq = ((EntranceJob) job).getJobRequest();
     Long jobReqId = jobReq.getId();
     ModuleUserUtils.getOperationUser(req, "execute task,id: " + jobReqId);
@@ -121,7 +120,7 @@ public class EntranceRestfulApi implements 
EntranceRestfulRemote {
         "************************************SCRIPT 
CODE************************************", job);
     String execID =
         ZuulEntranceUtils.generateExecID(
-            jobId,
+            job.getId(),
             Sender.getThisServiceInstance().getApplicationName(),
             new String[] {Sender.getThisInstance()});
     pushLog(
@@ -158,8 +157,7 @@ public class EntranceRestfulApi implements 
EntranceRestfulRemote {
     }
     String ip = JobHistoryHelper.getRequestIpAddr(req);
     map.put(TaskConstant.REQUEST_IP, ip);
-    String jobId = entranceServer.execute(json);
-    Job job = entranceServer.getJob(jobId).get();
+    Job job = entranceServer.execute(json);
     JobRequest jobRequest = ((EntranceJob) job).getJobRequest();
     Long jobReqId = jobRequest.getId();
     ModuleUserUtils.getOperationUser(req, "submit jobReqId: " + jobReqId);
@@ -175,7 +173,7 @@ public class EntranceRestfulApi implements 
EntranceRestfulRemote {
     pushLog(
         LogUtils.generateInfo(
             "Your job is accepted,  jobID is "
-                + jobId
+                + job.getId()
                 + " and jobReqId is "
                 + jobReqId
                 + " in "
@@ -184,7 +182,7 @@ public class EntranceRestfulApi implements 
EntranceRestfulRemote {
         job);
     String execID =
         ZuulEntranceUtils.generateExecID(
-            jobId,
+            job.getId(),
             Sender.getThisServiceInstance().getApplicationName(),
             new String[] {Sender.getThisInstance()});
     message = Message.ok();
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
 
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index 97ee71c0e..a050056fe 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -80,9 +80,9 @@ public class DefaultEntranceServer extends EntranceServer {
       logger.warn("event has been handled");
     } else {
       logger.warn("Entrance exit to stop all job");
-      EntranceJob[] allUndoneTask = getAllUndoneTask(null);
-      if (null != allUndoneTask) {
-        for (EntranceJob job : allUndoneTask) {
+      EntranceJob[] allUndoneJobs = getAllUndoneTask(null);
+      if (null != allUndoneJobs) {
+        for (EntranceJob job : allUndoneJobs) {
           job.onFailure(
               "Entrance exits the automatic cleanup task and can be 
rerun(服务退出自动清理任务,可以重跑)", null);
         }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index cc470d542..2b3a75377 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -52,14 +52,14 @@ abstract class EntranceServer extends Logging {
   def getEntranceContext: EntranceContext
 
   /**
-   * Execute a task and return an execId(执行一个task,返回一个execId)
+   * Execute a task and return an job(执行一个task,返回一个job)
    * @param params
    * @return
    */
-  def execute(params: java.util.Map[String, Any]): String = {
-    if (!params.containsKey(EntranceServer.DO_NOT_PRINT_PARAMS_LOG))
+  def execute(params: java.util.Map[String, Any]): Job = {
+    if (!params.containsKey(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)) {
       logger.debug("received a request: " + params)
-    else params.remove(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)
+    } else params.remove(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)
     var jobRequest = 
getEntranceContext.getOrCreateEntranceParser().parseToTask(params)
     // todo: multi entrance instances
     jobRequest.setInstances(Sender.getThisInstance)
@@ -131,14 +131,14 @@ abstract class EntranceServer extends Logging {
       
job.setProgressListener(getEntranceContext.getOrCreatePersistenceManager())
       job.setJobListener(getEntranceContext.getOrCreatePersistenceManager())
       job match {
-        case entranceJob: EntranceJob => {
+        case entranceJob: EntranceJob =>
           
entranceJob.setEntranceListenerBus(getEntranceContext.getOrCreateEventListenerBus)
-        }
         case _ =>
       }
       Utils.tryCatch {
-        if (logAppender.length() > 0)
+        if (logAppender.length() > 0) {
           job.getLogListener.foreach(_.onLogUpdate(job, 
logAppender.toString.trim))
+        }
       } { t =>
         logger.error("Failed to write init log, reason: ", t)
       }
@@ -158,13 +158,13 @@ abstract class EntranceServer extends Logging {
       job match {
         case entranceJob: EntranceJob =>
           entranceJob.getJobRequest.setReqId(job.getId())
-          if (jobTimeoutManager.timeoutCheck && 
JobTimeoutManager.hasTimeoutLabel(entranceJob))
+          if (jobTimeoutManager.timeoutCheck && 
JobTimeoutManager.hasTimeoutLabel(entranceJob)) {
             jobTimeoutManager.add(job.getId(), entranceJob)
+          }
           entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg))
         case _ =>
       }
-
-      job.getId()
+      job
     } { t =>
       job.onFailure("Submitting the query failed!(提交查询失败!)", t)
       val _jobRequest: JobRequest =
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
index 06b6520d1..59dc42475 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
@@ -49,6 +49,8 @@ import java.util
 import java.util.Date
 import java.util.concurrent.TimeUnit
 
+import scala.collection.JavaConverters._
+
 class EntranceWebSocketService
     extends ServerEventService
     with EntranceEventListener
@@ -115,10 +117,9 @@ class EntranceWebSocketService
               case job: Job =>
                 if (jobIdToEventId.containsKey(job.getId)) {
                   val entranceJob = job.asInstanceOf[EntranceJob]
-                  val engineTypeLabel = entranceJob.getJobRequest.getLabels
-                    .filter(l => 
l.getLabelKey.equalsIgnoreCase(LabelKeyConstant.ENGINE_TYPE_KEY))
-                    .headOption
-                    .getOrElse(null)
+                  val engineTypeLabel = 
entranceJob.getJobRequest.getLabels.asScala
+                    .find(l => 
l.getLabelKey.equalsIgnoreCase(LabelKeyConstant.ENGINE_TYPE_KEY))
+                    .orNull
                   if (null == engineTypeLabel) {
                     logger.error("Invalid engineTpyeLabel")
                     return
@@ -132,7 +133,8 @@ class EntranceWebSocketService
                   Utils.tryQuietly(
                     sendMsg(
                       job,
-                      "Get waiting size succeed."
+                      Message
+                        .ok("Get waiting size succeed.")
                         .data("execID", realID)
                         .data("taskID", taskID)
                         .data("waitingSize", index)
@@ -178,17 +180,16 @@ class EntranceWebSocketService
     } // TODO Convert to a suitable Map(转换成合适的Map)
     val websocketTag = event.getWebsocketTag
     params.put(TaskConstant.EXECUTE_USER, event.getUser)
-    val jobId = entranceServer.execute(params)
-    jobIdToEventId synchronized jobIdToEventId.put(jobId, event.getId)
-    websocketTagJobID synchronized websocketTagJobID.put(jobId, websocketTag)
-    val jobRequest = 
entranceServer.getJob(jobId).get.asInstanceOf[EntranceJob].getJobRequest
+    val job = entranceServer.execute(params)
+    jobIdToEventId synchronized jobIdToEventId.put(job.getId(), event.getId)
+    websocketTagJobID synchronized websocketTagJobID.put(job.getId(), 
websocketTag)
+    val jobRequest = job.asInstanceOf[EntranceJob].getJobRequest
     val taskID = jobRequest.getId
-    val job = entranceServer.getJob(jobId).get
     val engineTypeLabel = LabelUtil.getEngineTypeLabel(jobRequest.getLabels)
     val executeApplicationName: String = engineTypeLabel.getEngineType
     val creator: String = 
LabelUtil.getUserCreatorLabel(jobRequest.getLabels).getCreator
     val execID = ZuulEntranceUtils.generateExecID(
-      jobId,
+      job.getId(),
       executeApplicationName,
       Sender.getThisInstance,
       creator
@@ -197,7 +198,7 @@ class EntranceWebSocketService
     executeResponseMsg
       .data("execID", execID)
       .data("taskID", taskID)
-      .data("websocketTag", websocketTagJobID.get(jobId))
+      .data("websocketTag", websocketTagJobID.get(job.getId()))
     executeResponseMsg.setMethod(restfulURI + "entrance/execute")
     executeResponseMsg.setStatus(0)
     sendMsg(job, executeResponseMsg)
@@ -238,7 +239,7 @@ class EntranceWebSocketService
     "The request was executed successfully!"
       .data("execID", execID)
       .data("taskID", taskID)
-      .data("websocketTag", websocketTagJobID.get(jobId))
+      .data("websocketTag", websocketTagJobID.get(job.getId))
     // executeResponseMsg
   }
 
@@ -301,8 +302,8 @@ class EntranceWebSocketService
           .data("websocketTag", websocketTagJobID.get(realID))
           .data("taskID", taskID)
         logger.info(
-          s" retMessage: execID is $longExecID, status is ${status.toString}, 
websocketTag is ${websocketTagJobID
-            .get(realID)}"
+          "retMessage: execID is {}, status is {}, websocketTag is {}",
+          Array(longExecID, status.toString, websocketTagJobID.get(realID)): _*
         )
         retMessage.setStatus(0)
         retMessage.setMethod(restfulURI + "entrance/" + longExecID + "/status")
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
index 813315e7d..d12a9b9d3 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
@@ -43,7 +43,7 @@ import org.apache.commons.lang3.StringUtils
 
 import java.util
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object CSEntranceHelper extends Logging {
 
@@ -80,12 +80,12 @@ object CSEntranceHelper extends Logging {
    */
   def registerCSRSData(job: Job): Unit = {
     job match {
-      case entranceJob: EntranceJob => {
+      case entranceJob: EntranceJob =>
         val (contextIDValueStr, nodeNameStr) = 
getContextInfo(entranceJob.getParams)
-        logger.info(s"registerCSRSData: nodeName:$nodeNameStr")
-        if (StringUtils.isBlank(contextIDValueStr) || 
StringUtils.isBlank(nodeNameStr))
+        logger.info("registerCSRSData: nodeName: {}", nodeNameStr)
+        if (StringUtils.isBlank(contextIDValueStr) || 
StringUtils.isBlank(nodeNameStr)) {
           return null
-
+        }
         val contextKey = new CommonContextKey
         contextKey.setContextScope(ContextScope.PUBLIC)
         contextKey.setContextType(ContextType.DATA)
@@ -101,11 +101,10 @@ object CSEntranceHelper extends Logging {
                 SerializeHelper.serializeContextKey(contextKey),
                 data
               )
-            logger.info(s"(${contextKey.getKey} put ${jobRequest.getId} of 
jobId to cs)")
+            logger.info("({} put {} of jobId to cs)", contextKey.getKey: Any, 
jobRequest.getId: Any)
           case _ =>
         }
-        logger.info(s"registerCSRSData end: nodeName:$nodeNameStr")
-      }
+        logger.info("registerCSRSData end: nodeName: {}", nodeNameStr)
       case _ =>
     }
   }
@@ -123,7 +122,7 @@ object CSEntranceHelper extends Logging {
     )
 
     if (StringUtils.isNotBlank(contextIDValueStr) && 
StringUtils.isNotBlank(nodeNameStr)) {
-      logger.info(s"init node($nodeNameStr) cs info")
+      logger.info("init node({}) cs info", nodeNameStr)
       CSNodeServiceImpl.getInstance().initNodeCSInfo(contextIDValueStr, 
nodeNameStr)
     }
   }
@@ -144,21 +143,23 @@ object CSEntranceHelper extends Logging {
     if (StringUtils.isNotBlank(contextIDValueStr) && 
StringUtils.isNotBlank(nodeNameStr)) {
       val userCreatorLabel = 
LabelUtil.getUserCreatorLabel(requestPersistTask.getLabels)
       val newLabels = new util.ArrayList[Label[_]]
-      requestPersistTask.getLabels
+      requestPersistTask.getLabels.asScala
         .filterNot(_.isInstanceOf[UserCreatorLabel])
         .foreach(newLabels.add)
       SerializeHelper.deserializeContextID(contextIDValueStr) match {
         case contextID: LinkisWorkflowContextID =>
           if 
(CSCommonUtils.CONTEXT_ENV_PROD.equalsIgnoreCase(contextID.getEnv)) {
             logger.info(
-              s"reset creator from ${userCreatorLabel.getCreator} to " + 
EntranceConfiguration.SCHEDULER_CREATOR
-                .getHotValue()
+              "reset creator from {} to {}",
+              userCreatorLabel.getCreator: Any,
+              EntranceConfiguration.SCHEDULER_CREATOR.getHotValue(): Any
             )
             
userCreatorLabel.setCreator(EntranceConfiguration.SCHEDULER_CREATOR.getHotValue())
           } else {
             logger.info(
-              s"reset creator from ${userCreatorLabel.getCreator} to " + 
EntranceConfiguration.FLOW_EXECUTION_CREATOR
-                .getHotValue()
+              "reset creator from {} to {}",
+              userCreatorLabel.getCreator: Any,
+              EntranceConfiguration.FLOW_EXECUTION_CREATOR.getHotValue: Any
             )
             
userCreatorLabel.setCreator(EntranceConfiguration.FLOW_EXECUTION_CREATOR.getHotValue())
           }
@@ -182,19 +183,19 @@ object CSEntranceHelper extends Logging {
     )
 
     if (StringUtils.isNotBlank(contextIDValueStr)) {
-      logger.info(s"parse variable nodeName:$nodeNameStr")
+      logger.info("parse variable nodeName: {}", nodeNameStr)
       val linkisVariableList: util.List[LinkisVariable] =
         
CSVariableService.getInstance().getUpstreamVariables(contextIDValueStr, 
nodeNameStr);
       if (null != linkisVariableList) {
-        linkisVariableList.foreach { linkisVariable =>
+        linkisVariableList.asScala.foreach { linkisVariable =>
           variableMap.put(linkisVariable.getKey, linkisVariable.getValue)
         }
       }
-      if (variableMap.nonEmpty) {
+      if (!variableMap.isEmpty) {
         // 1.cs priority is low, the same ones are not added
         val varMap =
           
TaskUtils.getVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String,
 Any]])
-        variableMap.foreach { keyAndValue =>
+        variableMap.asScala.foreach { keyAndValue =>
           if (!varMap.containsKey(keyAndValue._1)) {
             varMap.put(keyAndValue._1, keyAndValue._2)
           }
@@ -205,7 +206,7 @@ object CSEntranceHelper extends Logging {
         )
       }
 
-      logger.info(s"parse variable end nodeName:$nodeNameStr")
+      logger.info("parse variable end nodeName: {}", nodeNameStr)
     }
   }
 
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
index 516b900b6..0d5d60598 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
@@ -22,7 +22,6 @@ import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
 import org.apache.linkis.entrance.exception.EntranceErrorException
-import org.apache.linkis.entrance.job.EntranceExecutionJob
 import org.apache.linkis.governance.common.entity.job.JobRequest
 import org.apache.linkis.scheduler.executer.{Executor, ExecutorManager}
 import org.apache.linkis.scheduler.queue.{GroupFactory, Job, SchedulerEvent}
@@ -49,17 +48,7 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
   override def askExecutor(schedulerEvent: SchedulerEvent): Option[Executor] =
     schedulerEvent match {
       case job: Job =>
-        val executor = createExecutor(job)
-        if (executor != null) {
-          job match {
-            case entranceExecutionJob: EntranceExecutionJob =>
-              val jobReq = entranceExecutionJob.getJobRequest
-              jobReq.setUpdatedTime(new Date(System.currentTimeMillis()))
-            case _ =>
-          }
-          Some(executor)
-        } else None
-
+        Option(createExecutor(job))
     }
 
   // Update the overall job running status after no subtask runs
@@ -70,7 +59,7 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
         val startTime = System.currentTimeMillis()
         var warnException: WarnException = null
         var executor: Option[Executor] = None
-        while (System.currentTimeMillis - startTime < wait.toMillis && 
executor.isEmpty)
+        while (System.currentTimeMillis - startTime < wait.toMillis && 
executor.isEmpty) {
           Utils.tryCatch(executor = askExecutor(job)) {
             case warn: WarnException =>
               logger.warn("request engine failed!", warn)
@@ -78,6 +67,7 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
               None
             case t: Throwable => throw t
           }
+        }
         // todo check
         if (warnException != null && executor.isEmpty) throw warnException
         executor
@@ -98,7 +88,7 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
     schedulerEvent match {
       case job: EntranceJob =>
         job.getJobRequest match {
-          case jobRequest: JobRequest =>
+          case jobReq: JobRequest =>
             val entranceEntranceExecutor =
               new DefaultEntranceExecutor(idGenerator.incrementAndGet())
             // getEngineConn Executor
@@ -108,6 +98,7 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
                 LogUtils.generateInfo("Your job is being scheduled by 
orchestrator.")
               )
             )
+            jobReq.setUpdatedTime(new Date(System.currentTimeMillis()))
 
             /**
              * // val engineConnExecutor = 
engineConnManager.getAvailableEngineConnExecutor(mark)
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 146eb2c2c..e230bbc78 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -86,7 +86,7 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
     if (labelMap.isEmpty) {
       throw new EntranceIllegalParamException(
         EntranceErrorCode.PARAM_CANNOT_EMPTY.getErrCode,
-        EntranceErrorCode.PARAM_CANNOT_EMPTY.getDesc + s",  labels is null"
+        s"${EntranceErrorCode.PARAM_CANNOT_EMPTY.getDesc},  labels is null"
       )
     }
     // 3. set Code
@@ -154,7 +154,7 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
   ): Unit = {
     val engineRunTypeLabel = 
labels.getOrDefault(LabelKeyConstant.CODE_TYPE_KEY, null)
     if (StringUtils.isBlank(runType) && null == engineRunTypeLabel) {
-      val msg = s"You need to specify runType in execution content, such as 
sql"
+      val msg = "You need to specify runType in execution content, such as sql"
       logger.warn(msg)
       throw new EntranceIllegalParamException(
         EntranceErrorCode.LABEL_PARAMS_INVALID.getErrCode,
@@ -260,7 +260,7 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
         .asInstanceOf[util.Map[String, Object]]
       if (null != labelMap && !labelMap.isEmpty) {
         val list: util.List[Label[_]] =
-          labelBuilderFactory.getLabels(labelMap.asInstanceOf[util.Map[String, 
AnyRef]])
+          labelBuilderFactory.getLabels(labelMap)
         labelList.addAll(list)
       }
     }
diff --git 
a/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
 
b/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
index 3f196cac9..6e4247a5c 100644
--- 
a/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
+++ 
b/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
@@ -35,9 +35,6 @@ public class CSFlowInfosSerializer extends 
AbstractSerializer<CSFlowInfos> {
 
   @Override
   public boolean accepts(Object obj) {
-    if (null != obj && 
obj.getClass().getName().equals(CSFlowInfos.class.getName())) {
-      return true;
-    }
-    return false;
+    return null != obj && CSFlowInfos.class.isAssignableFrom(obj.getClass());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to