This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 21a649040 [linkis-entrance] Modification of scala file floating red 
(#3243)
21a649040 is described below

commit 21a649040e0a23b8984c3b32d6509e723d9f1baf
Author: 成彬彬 <[email protected]>
AuthorDate: Wed Sep 7 23:46:28 2022 +0800

    [linkis-entrance] Modification of scala file floating red (#3243)
    
    * [linkis-entrance] Modification of scala file floating red
---
 .../linkis/entrance/EntranceWebSocketService.scala | 38 +++++-----------------
 .../cli/heartbeat/CliHeartbeatMonitor.scala        |  4 +--
 .../linkis/entrance/execute/EntranceExecutor.scala |  1 -
 .../entrance/execute/EntranceExecutorManager.scala | 15 ++-------
 .../linkis/entrance/execute/EntranceJob.scala      |  7 ++--
 .../execute/ExecuteRequestInterceptor.scala        |  4 +--
 .../apache/linkis/entrance/execute/MarkReq.scala   | 14 ++------
 .../interceptor/impl/CommentInterceptor.scala      |  2 ++
 .../impl/ShellDangerousGrammerInterceptor.scala    |  4 +--
 .../linkis/entrance/log/CacheLogManager.scala      |  8 +----
 .../linkis/entrance/log/ErrorCodeListener.scala    |  4 +--
 .../apache/linkis/entrance/log/LogManager.scala    |  4 +--
 .../org/apache/linkis/entrance/log/LogWriter.scala |  3 +-
 .../org/apache/linkis/entrance/log/LoopArray.scala |  5 +--
 .../linkis/entrance/log/WebSocketLogWriter.scala   |  3 +-
 .../entrance/parser/CommonEntranceParser.scala     | 29 +++++++++--------
 .../entrance/scheduler/EntranceGroupFactory.scala  | 18 +++++-----
 .../scheduler/cache/ReadCacheConsumer.scala        | 15 +++++----
 .../entrance/timeout/JobTimeoutManager.scala       | 22 +++++++------
 .../linkis/entrance/utils/JobHistoryHelper.scala   |  4 +--
 20 files changed, 85 insertions(+), 119 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
index fe44be993..b3abc3d1d 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
@@ -60,9 +60,11 @@ class EntranceWebSocketService
   private val websocketTagJobID = new util.HashMap[String, String]()
 
   private val restfulURI =
-    if (ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue.endsWith("/"))
+    if (ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue.endsWith("/")) {
       ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
-    else ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue + "/"
+    } else {
+      ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue + "/"
+    }
 
   private val executePattern = restfulURI + "entrance/execute"
   private val logUrlPattern = (restfulURI + """entrance/(.+)/log""").r
@@ -233,8 +235,6 @@ class EntranceWebSocketService
           s"Your job is accepted,  jobID is ${job.getId} and taskID is 
$taskID. Please wait it to be scheduled"
         )
       )
-    // val executeApplicationName:String = jobRequest.getExecuteApplicationName
-    // val execID = ZuulEntranceUtils.generateExecID(jobId, 
executeApplicationName, Sender.getThisInstance)
     "The request was executed successfully!"
       .data("execID", execID)
       .data("taskID", taskID)
@@ -246,7 +246,7 @@ class EntranceWebSocketService
     var retMessage: Message = null
     val realID = ZuulEntranceUtils.parseExecID(id)(3)
     entranceServer.getJob(realID) foreach {
-      case entranceExecutionJob: EntranceExecutionJob => {
+      case entranceExecutionJob: EntranceExecutionJob =>
         logger.info(s"begin to get job $realID log via websocket")
         val logsArr: Array[String] = new Array[String](4)
         entranceExecutionJob.getWebSocketLogReader.foreach(logReader =>
@@ -263,7 +263,6 @@ class EntranceWebSocketService
         retMessage.setMethod(restfulURI + "entrance/" + id + "/log")
         logger.info(s"end to get job $realID log via websocket")
         return retMessage
-      }
       case _ =>
     }
     retMessage = Message.error(
@@ -275,11 +274,7 @@ class EntranceWebSocketService
   }
 
   def dealStatus(event: ServerEvent, id: String): Message = {
-    //    val response:Response = entranceRestfulApi.status(id)
-    //    Message.responseToMessage(response)
     var retMessage: Message = null
-
-    // val realID:String = if (entranceServer.getJob(id).isDefined) id else 
ZuulEntranceUtils.parseExecID(id)(2)
     val realID: String = if (!id.contains(":")) id else 
ZuulEntranceUtils.parseExecID(id)(3)
     entranceServer.getJob(realID) foreach {
       case entranceExecutionJob: EntranceExecutionJob =>
@@ -293,8 +288,9 @@ class EntranceWebSocketService
           Sender.getThisInstance,
           userCreatorLabel.getCreator
         )
-        if (!jobIdToEventId.containsKey(realID) && event != null)
+        if (!jobIdToEventId.containsKey(realID) && event != null) {
           jobIdToEventId synchronized jobIdToEventId.put(realID, event.getId)
+        }
         val status = entranceExecutionJob.getState
 
         retMessage = Message.ok("Get the status of the task 
successfully(获取任务状态成功)")
@@ -323,12 +319,10 @@ class EntranceWebSocketService
   }
 
   def dealProgress(event: ServerEvent, id: String): Message = {
-    //    val response:Response = entranceRestfulApi.progress(id)
-    //    Message.responseToMessage(response)
     var retMessage: Message = null
     val realID = ZuulEntranceUtils.parseExecID(id)(3)
     entranceServer.getJob(realID) foreach {
-      case entranceExecutionJob: EntranceExecutionJob => {
+      case entranceExecutionJob: EntranceExecutionJob =>
         val progress = entranceExecutionJob.getProgress
         retMessage = Message.ok("Get the task progress successfully(获取任务进度成功)")
         val taskID = entranceExecutionJob.getJobRequest.getId
@@ -340,7 +334,6 @@ class EntranceWebSocketService
         retMessage.setStatus(0)
         retMessage.setMethod(restfulURI + "entrance/" + id + "/progress")
         return retMessage
-      }
       case _ =>
     }
     retMessage = Message.error("Get task progress failed(获取任务进度失败)")
@@ -358,8 +351,6 @@ class EntranceWebSocketService
   }
 
   def dealKill(event: ServerEvent, id: String): Message = {
-    //    val response:Response = entranceRestfulApi.kill(id)
-    //    Message.responseToMessage(response)
     var retMessage: Message = null
     val realID = ZuulEntranceUtils.parseExecID(id)(3)
     entranceServer.getJob(realID) foreach {
@@ -384,19 +375,6 @@ class EntranceWebSocketService
     retMessage
   }
 
-  /* def dealBackGroundService(event: ServerEvent): Message = {
-     val params = event.getData.map{case (k, v) => k -> v.asInstanceOf[Any]}
-     val backgroundType = params.get("background").get
-     //val backgroundType = "export"
-     val backgroundService =  
entranceServer.getEntranceContext.getOrCreateBackGroundService.find(f 
=>backgroundType.equals(f.serviceType))
-     if (backgroundService.isEmpty) {
-       info("The corresponding background service was not 
found...(未找到相应的后台服务...)")
-       dealExecute(event)
-     }else{
-       dealExecute(backgroundService.get.operation(event))
-     }
-   }*/
-
   private def concatLog(length: Int, log: String, flag: StringBuilder, all: 
StringBuilder): Unit = {
     if (length == 1) {
       flag ++= log ++= "\n"
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
index 851e9fa32..7142a4f6f 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
@@ -90,9 +90,9 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler) 
extends Logging {
       case entranceJob: EntranceJob =>
         if (isCliJob(entranceJob)) {
           val id = entranceJob.getJobRequest.getId.toString
-          if (!infoMap.containsKey(id))
+          if (!infoMap.containsKey(id)) {
             logger.error(s"heartbeat on non-existing job!! job id: $id")
-          else infoMap.get(id).updateNewestAccessByClientTimestamp()
+          } else infoMap.get(id).updateNewestAccessByClientTimestamp()
         }
       case _ =>
     }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index 429c80f9e..2b80d731a 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -78,7 +78,6 @@ abstract class EntranceExecutor(val id: Long, val mark: 
MarkReq) extends Executo
 
   protected def callExecute(request: ExecuteRequest): ExecuteResponse
 
-  //  override def toString: String = 
s"${getInstance.getApplicationName}Engine($getId, $getUser, $getCreator, 
${getInstance.getInstance})"
   override def toString: String = "${getId}"
 
   override def getId: Long = this.id
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
index 0f8a4303b..43dd27572 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
@@ -72,10 +72,8 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
 
     }
 
-  // todo 提交任务逻辑调整:将job切分成多条语句,塞到jobGroup队列中。任务提交后,按照队列先后顺序,依次执行任务;
-  // 没个子任务运行后,更新整体的Job运行状态
-  // 直到所有任务都完毕,或者存在任务异常退出,则结束整体的Job
-
+  // Update the overall job running status after no subtask runs
+  // Until all the tasks are completed, or the task exits abnormally, the 
overall job ends
   override def askExecutor(schedulerEvent: SchedulerEvent, wait: Duration): 
Option[Executor] =
     schedulerEvent match {
       case job: Job =>
@@ -89,14 +87,7 @@ abstract class EntranceExecutorManager(groupFactory: 
GroupFactory)
               warnException = warn
               None
             case t: Throwable => throw t
-          } /*match {
-          case Some(e) => executor = Option(e)
-          case _ =>
-            if (System.currentTimeMillis - startTime < wait.toMillis) {
-              val interval = math.min(3000, wait.toMillis - 
System.currentTimeMillis + startTime)
-              //getOrCreateEngineManager().waitForIdle(interval)
-            }
-        }*/
+          }
         // todo check
         if (warnException != null && executor.isEmpty) throw warnException
         executor
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index c0a07380d..9239670ae 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -109,7 +109,7 @@ abstract class EntranceJob extends Job {
     }
   }
 
-  @Deprecated
+  @deprecated
   def incrementResultSetPersisted(): Unit = {
     //    persistedResultSets.incrementAndGet()
   }
@@ -178,8 +178,9 @@ abstract class EntranceJob extends Job {
           TaskConstant.ENTRANCEJOB_COMPLETE_TIME,
           new Date(System.currentTimeMillis())
         )
-        if (getJobInfo != null)
+        if (getJobInfo != null) {
           getLogListener.foreach(_.onLogUpdate(this, 
LogUtils.generateInfo(getJobInfo.getMetric)))
+        }
         if (isSucceed) {
           getLogListener.foreach(
             _.onLogUpdate(
@@ -327,6 +328,6 @@ abstract class EntranceJob extends Job {
 
 object EntranceJob {
 
-  def JOB_COMPLETED_PROGRESS = 1.0f
+  def JOB_COMPLETED_PROGRESS: Float = 1.0f
 
 }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
index 09b6156ba..151d319cc 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.entrance.execute
 import org.apache.linkis.governance.common.protocol.task.{RequestTask, 
RequestTaskExecute}
 import org.apache.linkis.scheduler.executer.{ExecuteRequest, JobExecuteRequest}
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
 
 trait ExecuteRequestInterceptor {
 
@@ -96,7 +96,7 @@ object RuntimePropertiesExecuteRequestInterceptor extends 
ExecuteRequestIntercep
   override def apply(requestTask: RequestTask, executeRequest: 
ExecuteRequest): RequestTask =
     executeRequest match {
       case runtime: RuntimePropertiesExecuteRequest =>
-        JavaConversions.mapAsScalaMap(runtime.properties).foreach { case (k, 
v) =>
+        mapAsScalaMapConverter(runtime.properties).asScala.foreach { case (k, 
v) =>
           requestTask.data(k, v)
         }
         requestTask
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
index 4e6efcb6f..d04511f29 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.entrance.execute
 import java.util
 
 import scala.beans.BeanProperty
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
 
 class MarkReq {
 
@@ -60,20 +60,11 @@ class MarkReq {
         return flag
       }
 
-      /* if (other.getProperties != null && getProperties != null) {
-         val iterator = other.getProperties.iterator
-         while (iterator.hasNext) {
-           val next = iterator.next()
-           if (!next._2.equalsIgnoreCase(getProperties.get(next._1))) {
-             return flag
-           }
-         }
-       }*/
       if (other.getLabels != null && getLabels != null) {
         if (getLabels.size() != other.getLabels.size()) {
           return false
         }
-        val iterator = other.getLabels.iterator
+        val iterator = other.getLabels.asScala.iterator
         while (iterator.hasNext) {
           val next = iterator.next()
           if (null == next._2 || !next._2.equals(getLabels.get(next._1))) {
@@ -86,4 +77,5 @@ class MarkReq {
     flag
   }
 
+  override def hashCode(): Int = super.hashCode()
 }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
index e99c3bfa4..da751a4bb 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
@@ -110,7 +110,9 @@ object CommentMain {
   def main(args: Array[String]): Unit = {
     val sqlCode = "select * from default.user;--你好;show tables"
     val sqlCode1 = "select * from default.user--你好;show tables"
+    // scalastyle:off println
     println(SQLCommentHelper.dealComment(sqlCode))
+    // scalastyle:on println
   }
 
 }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
index 567cddfe4..cabbe84b6 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
@@ -107,9 +107,7 @@ class ShellDangerousGrammerInterceptor extends 
EntranceInterceptor with Logging
       logger.info(s"GET REQUEST CODE_TYPE ${codeType} and ENGINE_TYPE 
${EngineType}")
       if (shellContainDangerUsage(jobRequest.getExecutionCode)) {
         throw DangerousGramsCheckException("Shell code contains blacklisted 
code(shell中包含黑名单代码)")
-      } /*else if (!shellWhiteUsage(jobRequest.getExecutionCode)) {
-        throw  DangerousGramsCheckException("The shell code is not in the 
whitelist code(shell代码不在白名单中)")
-      }*/
+      }
       jobRequest
     } else {
       jobRequest
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
index bc068b89e..8b851e402 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
@@ -73,7 +73,7 @@ class CacheLogManager extends LogManager with Logging {
       return null
     }
     job match {
-      case entranceExecutionJob: EntranceExecutionJob => {
+      case entranceExecutionJob: EntranceExecutionJob =>
         val cache: Cache = 
Cache(EntranceConfiguration.DEFAULT_CACHE_MAX.getValue)
         val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
         val fsLogPath = new FsPath(logPath)
@@ -97,13 +97,7 @@ class CacheLogManager extends LogManager with Logging {
           }
         entranceExecutionJob.setLogWriter(cacheLogWriter)
         logger.info(s"job ${entranceExecutionJob.getJobRequest.getId} create 
cacheLogWriter")
-        /*val webSocketCacheLogReader: WebSocketCacheLogReader =
-          new WebSocketCacheLogReader(logPath, 
EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache, 
entranceExecutionJob.getUser)
-        entranceExecutionJob.setWebSocketLogReader(webSocketCacheLogReader)
-        val webSocketLogWriter: WebSocketLogWriter = new 
WebSocketLogWriter(entranceExecutionJob, 
entranceContext.getOrCreateLogListenerBus)
-        entranceExecutionJob.setWebSocketLogWriter(webSocketLogWriter)*/
         cacheLogWriter
-      }
       case _ => null
     }
   }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
index 3bd93465c..2be792e05 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
@@ -39,11 +39,11 @@ class PersistenceErrorCodeListener extends 
ErrorCodeListener {
   def setPersistenceManager(persistenceManager: PersistenceManager): Unit =
     this.persistenceManager = persistenceManager
 
-  def getPersistenceManager = persistenceManager
+  def getPersistenceManager: PersistenceManager = persistenceManager
 
   def setEntranceParser(entranceParser: EntranceParser): Unit = 
this.entranceParser = entranceParser
 
-  def getEntranceParser = entranceParser
+  def getEntranceParser: EntranceParser = entranceParser
 
   /**
    * onErrorCodeCreated: When a job is running, it terminates unexpectedly or 
generates an error,
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
index e1c5c1988..626a643a0 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
@@ -47,10 +47,9 @@ abstract class LogManager extends LogListener with Logging {
 
   def dealLogEvent(job: Job, log: String): Unit = {
     Utils.tryCatch {
-      //     warn(s"jobid :${job.getId()}\nlog : ${log}")
       job match {
         case entranceExecutionJob: EntranceExecutionJob =>
-          if (entranceExecutionJob.getLogWriter.isEmpty)
+          if (entranceExecutionJob.getLogWriter.isEmpty) {
             entranceExecutionJob.getLogWriterLocker synchronized {
               if (entranceExecutionJob.getLogWriter.isEmpty) {
                 val logWriter = createLogWriter(entranceExecutionJob)
@@ -59,6 +58,7 @@ abstract class LogManager extends LogListener with Logging {
                 }
               }
             }
+          }
           entranceExecutionJob.getLogWriter.foreach(logWriter => 
logWriter.write(log))
           errorCodeManager.foreach(_.errorMatch(log).foreach { case (code, 
errorMsg) =>
             errorCodeListener.foreach(_.onErrorCodeCreated(job, code, 
errorMsg))
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
index 0b783c47c..f730abe50 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
@@ -74,8 +74,9 @@ abstract class LogWriter(charset: String) extends Closeable 
with Flushable with
 abstract class AbstractLogWriter(logPath: String, user: String, charset: 
String)
     extends LogWriter(charset) {
 
-  if (StringUtils.isBlank(logPath))
+  if (StringUtils.isBlank(logPath)) {
     throw new EntranceErrorException(20301, "logPath cannot be empty.")
+  }
 
   protected var fileSystem =
     FSFactory.getFsByProxyUser(new FsPath(logPath), 
user).asInstanceOf[FileSystem]
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
index 59edd0731..155d8c7bd 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
@@ -44,12 +44,13 @@ class LoopArray[T](maxCapacity: Int) {
 
   def get(index: Int): T = eventQueue synchronized {
     val _max = max
-    if (index < realSize)
+    if (index < realSize) {
       throw new IllegalArgumentException(
         "The index " + index + " has already been deleted, now index must be 
bigger than " + realSize
       )
-    else if (index > _max)
+    } else if (index > _max) {
       throw new IllegalArgumentException("The index " + index + " must be less 
than " + _max)
+    }
     val _index = (flag + (index - realSize)) % maxCapacity
     eventQueue(_index).asInstanceOf[T]
   }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
index 113a2b7e9..a2aeea691 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
@@ -33,7 +33,8 @@ class WebSocketLogWriter(
     entranceLogListenerBus: EntranceLogListenerBus[EntranceLogListener, 
EntranceLogEvent]
 ) {
 
-  def write(msg: String): Unit = if (entranceLogListenerBus != null)
+  def write(msg: String): Unit = if (entranceLogListenerBus != null) {
     entranceLogListenerBus.post(EntrancePushLogEvent(job, msg))
+  }
 
 }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 583f110d5..c145f2319 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -39,7 +39,6 @@ import org.apache.commons.lang3.StringUtils
 import java.util
 import java.util.Date
 
-import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
 
 class CommonEntranceParser(val persistenceManager: PersistenceManager)
@@ -95,8 +94,9 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
     if (executionContent.containsKey(TaskConstant.CODE)) {
       code = executionContent.get(TaskConstant.CODE).asInstanceOf[String]
       runType = executionContent.get(TaskConstant.RUNTYPE).asInstanceOf[String]
-      if (StringUtils.isEmpty(code))
+      if (StringUtils.isEmpty(code)) {
         throw new EntranceIllegalParamException(20007, "param executionCode 
can not be empty ")
+      }
     } else {
       // todo check
       throw new EntranceIllegalParamException(20010, "Only code with runtype 
supported !")
@@ -114,7 +114,7 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
     jobRequest.setLabels(new util.ArrayList[Label[_]](labels.values()))
     jobRequest.setSource(source)
     jobRequest.setStatus(SchedulerEventState.Inited.toString)
-    // Entrance指标:任务提交时间
+    // Entry indicator: task submission time
     jobRequest.setMetrics(new util.HashMap[String, Object]())
     jobRequest.getMetrics.put(
       TaskConstant.ENTRANCEJOB_SUBMIT_TIME,
@@ -125,7 +125,7 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
   }
 
   private def checkEngineTypeLabel(labels: util.Map[String, Label[_]]): Unit = 
{
-    val engineTypeLabel = labels.getOrElse(LabelKeyConstant.ENGINE_TYPE_KEY, 
null)
+    val engineTypeLabel = 
labels.asScala.getOrElse(LabelKeyConstant.ENGINE_TYPE_KEY, null)
     if (null == engineTypeLabel) {
       val msg = s"You need to specify engineTypeLabel in labels, such as 
spark-2.4.3"
       throw new EntranceIllegalParamException(
@@ -145,7 +145,7 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
       runType: String,
       labels: util.Map[String, Label[_]]
   ): Unit = {
-    val engineRunTypeLabel = labels.getOrElse(LabelKeyConstant.CODE_TYPE_KEY, 
null)
+    val engineRunTypeLabel = 
labels.asScala.getOrElse(LabelKeyConstant.CODE_TYPE_KEY, null)
     if (StringUtils.isBlank(runType) && null == engineRunTypeLabel) {
       val msg = s"You need to specify runType in execution content, such as 
sql"
       logger.warn(msg)
@@ -171,7 +171,7 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
       executeUser: String,
       labels: util.Map[String, Label[_]]
   ): Unit = {
-    var userCreatorLabel = labels
+    var userCreatorLabel = labels.asScala
       .getOrElse(LabelKeyConstant.USER_CREATOR_TYPE_KEY, null)
       .asInstanceOf[UserCreatorLabel]
     if (null == userCreatorLabel) {
@@ -193,8 +193,9 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
     if (StringUtils.isBlank(submitUser)) {
       jobReq.setSubmitUser(umUser)
     }
-    if (umUser == null)
+    if (umUser == null) {
       throw new EntranceIllegalParamException(20005, "execute user can not be 
null")
+    }
     jobReq.setExecuteUser(umUser)
     var executionCode = 
params.get(TaskConstant.EXECUTIONCODE).asInstanceOf[String]
     val _params = params.get(TaskConstant.PARAMS)
@@ -209,21 +210,21 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
       .asInstanceOf[util.Map[String, String]]
     val executeApplicationName =
       params.get(TaskConstant.EXECUTEAPPLICATIONNAME).asInstanceOf[String]
-    if (StringUtils.isEmpty(creator))
+    if (StringUtils.isEmpty(creator)) {
       creator = EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue
-    // if (StringUtils.isEmpty(executeApplicationName)) throw new 
EntranceIllegalParamException(20006, "param executeApplicationName can not be 
empty or null")
-    /* When the execution type is IDE, executionCode and scriptPath cannot be 
empty at the same time*/
-    /*当执行类型为IDE的时候,executionCode和scriptPath不能同时为空*/
+    }
+    // When the execution type is IDE, executioncode and scriptpath cannot be 
empty at the same time
     if (
         EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue.equals(
           creator
         ) && StringUtils.isEmpty(source.get(TaskConstant.SCRIPTPATH)) &&
         StringUtils.isEmpty(executionCode)
-    )
+    ) {
       throw new EntranceIllegalParamException(
         20007,
         "param executionCode and scriptPath can not be empty at the same time"
       )
+    }
     var runType: String = null
     if (StringUtils.isNotEmpty(executionCode)) {
       runType = params.get(TaskConstant.RUNTYPE).asInstanceOf[String]
@@ -254,9 +255,9 @@ class CommonEntranceParser(val persistenceManager: 
PersistenceManager)
     }
     jobReq.setProgress("0.0")
     jobReq.setSource(source)
-    // 为了兼容代码,让engineType和runType都有同一个属性
+    // In order to be compatible with the code, let enginetype and runtype 
have the same attribute
     jobReq.setStatus(SchedulerEventState.Inited.toString)
-    // 封装Labels
+    // Package labels
     jobReq.setLabels(labelList)
     jobReq.setMetrics(new util.HashMap[String, Object]())
     jobReq.getMetrics.put(TaskConstant.ENTRANCEJOB_SUBMIT_TIME, new 
Date(System.currentTimeMillis))
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
index b680f7537..1ffb3ffa2 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
@@ -45,7 +45,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 import java.util.regex.Pattern
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.google.common.cache.{Cache, CacheBuilder}
 
@@ -188,8 +188,9 @@ object EntranceGroupFactory {
           runtime.get(TaskConstant.READ_FROM_CACHE) != null && runtime
             .get(TaskConstant.READ_FROM_CACHE)
             .asInstanceOf[Boolean]
-      ) CACHE
-      else ""
+      ) {
+        CACHE
+      } else ""
     if (StringUtils.isNotEmpty(creator)) creator + "_" + user + cache
     else EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue + "_" 
+ user + cache
   }
@@ -199,9 +200,9 @@ object EntranceGroupFactory {
       params: util.Map[String, Any] = new util.HashMap[String, Any]
   ): String = {
 
-    val userCreator = labels.find(_.isInstanceOf[UserCreatorLabel])
-    val engineType = labels.find(_.isInstanceOf[EngineTypeLabel])
-    val concurrent = labels.find(_.isInstanceOf[ConcurrentEngineConnLabel])
+    val userCreator = labels.asScala.find(_.isInstanceOf[UserCreatorLabel])
+    val engineType = labels.asScala.find(_.isInstanceOf[EngineTypeLabel])
+    val concurrent = 
labels.asScala.find(_.isInstanceOf[ConcurrentEngineConnLabel])
     if (userCreator.isEmpty || engineType.isEmpty) {
       throw new EntranceErrorException(20001, "userCreator label or engineType 
label cannot null")
     }
@@ -223,8 +224,9 @@ object EntranceGroupFactory {
             runtime.get(TaskConstant.READ_FROM_CACHE) != null && runtime
               .get(TaskConstant.READ_FROM_CACHE)
               .asInstanceOf[Boolean]
-        ) CACHE
-        else ""
+        ) {
+          CACHE
+        } else ""
       val groupName =
         userCreatorLabel.getCreator + "_" + userCreatorLabel.getUser + "_" + 
engineTypeLabel.getEngineType + cache
       groupName
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
index 881b5a669..6c1ec4fc2 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
@@ -41,7 +41,7 @@ import org.apache.commons.lang3.StringUtils
 
 import java.util.concurrent.ExecutorService
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.google.common.collect.Lists
 
@@ -59,11 +59,13 @@ class ReadCacheConsumer(
         job.getJobRequest match {
           case jobRequest: JobRequest =>
             Utils.tryCatch {
-              val engineTpyeLabel = jobRequest.getLabels
+              val engineTpyeLabel = jobRequest.getLabels.asScala
                 .filter(l => 
l.getLabelKey.equalsIgnoreCase(LabelKeyConstant.ENGINE_TYPE_KEY))
                 .headOption
                 .getOrElse(null)
-              val labelStrList = jobRequest.getLabels.map { case l => 
l.getStringValue }.toList
+              val labelStrList = jobRequest.getLabels.asScala.map { case l =>
+                l.getStringValue
+              }.toList
               if (null == engineTpyeLabel) {
                 logger.error(
                   "Invalid engineType null, cannot process. jobReq : " + 
BDPJettyServerHelper.gson
@@ -78,13 +80,13 @@ class ReadCacheConsumer(
               val cacheResult = JobHistoryHelper.getCache(
                 jobRequest.getExecutionCode,
                 jobRequest.getExecuteUser,
-                labelStrList,
+                labelStrList.asJava,
                 readCacheBefore
               )
               if (cacheResult != null && 
StringUtils.isNotBlank(cacheResult.getResultLocation)) {
                 val resultSets = listResults(cacheResult.getResultLocation, 
job.getUser)
                 if (resultSets.size() > 0) {
-                  for (resultSet: FsPath <- resultSets) {
+                  for (resultSet: FsPath <- resultSets.asScala) {
                     val alias = FilenameUtils.getBaseName(resultSet.getPath)
                     val output = FsPath
                       .getFsPath(
@@ -137,11 +139,12 @@ class ReadCacheConsumer(
     val consumer = 
schedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(groupName)
     val index = consumer.getConsumeQueue.offer(job)
     // index.map(getEventId(_, groupName)).foreach(job.setId)
-    if (index.isEmpty)
+    if (index.isEmpty) {
       throw new SchedulerErrorException(
         12001,
         "The submission job failed and the queue is full!(提交作业失败,队列已满!)"
       )
+    }
   }
 
 }
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
index b50a372dc..af8288d42 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
@@ -31,7 +31,7 @@ import org.apache.linkis.manager.label.entity.entrance.{
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class JobTimeoutManager extends Logging {
 
@@ -43,9 +43,9 @@ class JobTimeoutManager extends Logging {
 
   def add(jobKey: String, job: EntranceJob): Unit = {
     logger.info(s"Adding timeout job: ${job.getId()}")
-    if (!timeoutJobByName.contains(jobKey)) {
+    if (!timeoutJobByName.asScala.contains(jobKey)) {
       synchronized {
-        if (!timeoutJobByName.contains(jobKey)) {
+        if (!timeoutJobByName.asScala.contains(jobKey)) {
           timeoutJobByName.put(jobKey, job)
         }
       }
@@ -65,7 +65,7 @@ class JobTimeoutManager extends Logging {
   }
 
   def jobExist(jobKey: String): Boolean = {
-    timeoutJobByName.contains(jobKey)
+    timeoutJobByName.asScala.contains(jobKey)
   }
 
   def jobCompleteDelete(jobkey: String): Unit = {
@@ -89,7 +89,7 @@ class JobTimeoutManager extends Logging {
           if (job.getStartTime > 0) job.getStartTime / 1000 else 
currentTimeSeconds
         val runningTimeSeconds = currentTimeSeconds - 
jobRunningStartTimeSeconds
         if (!job.isCompleted) {
-          job.jobRequest.getLabels foreach {
+          job.jobRequest.getLabels.asScala foreach {
             case queueTimeOutLabel: JobQueuingTimeoutLabel =>
               if (
                   job.isWaiting && queueTimeOutLabel.getQueuingTimeout > 0 && 
queuingTimeSeconds >= queueTimeOutLabel.getQueuingTimeout
@@ -119,7 +119,7 @@ class JobTimeoutManager extends Logging {
         }
       }
 
-      timeoutJobByName.foreach(item => {
+      timeoutJobByName.asScala.foreach(item => {
         logger.info(s"Running timeout detection!")
         synchronized {
           jobCompleteDelete(item._1)
@@ -129,7 +129,7 @@ class JobTimeoutManager extends Logging {
     }
   }
 
-  // 线程周期性扫描超时任务
+  // Thread periodic scan timeout task
   val woker = Utils.defaultScheduler.scheduleAtFixedRate(
     new Runnable() {
 
@@ -153,8 +153,10 @@ object JobTimeoutManager {
 
   // If the timeout label set by the user is invalid, execution is not allowed
   def checkTimeoutLabel(labels: util.Map[String, Label[_]]): Unit = {
-    val jobQueuingTimeoutLabel = 
labels.getOrElse(LabelKeyConstant.JOB_QUEUING_TIMEOUT_KEY, null)
-    val jobRunningTimeoutLabel = 
labels.getOrElse(LabelKeyConstant.JOB_RUNNING_TIMEOUT_KEY, null)
+    val jobQueuingTimeoutLabel =
+      labels.asScala.getOrElse(LabelKeyConstant.JOB_QUEUING_TIMEOUT_KEY, null)
+    val jobRunningTimeoutLabel =
+      labels.asScala.getOrElse(LabelKeyConstant.JOB_RUNNING_TIMEOUT_KEY, null)
     val posNumPattern = "^[0-9]+$"
     if (
         (null != jobQueuingTimeoutLabel && 
!jobQueuingTimeoutLabel.getStringValue.matches(
@@ -174,7 +176,7 @@ object JobTimeoutManager {
 
   def hasTimeoutLabel(entranceJob: EntranceJob): Boolean = {
     val labels = entranceJob.jobRequest.getLabels
-    labels.exists(label =>
+    labels.asScala.exists(label =>
       label.getLabelKey == LabelKeyConstant.JOB_QUEUING_TIMEOUT_KEY ||
         label.getLabelKey == LabelKeyConstant.JOB_RUNNING_TIMEOUT_KEY
     )
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index 0345120e2..d912e7110 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -91,7 +91,7 @@ object JobHistoryHelper extends Logging {
   }
 
   /**
-   * 对于一个在内存中找不到这个任务的话,可以直接干掉
+   * If the task cannot be found in memory, you can directly kill it
    *
    * @param taskID
    */
@@ -106,7 +106,7 @@ object JobHistoryHelper extends Logging {
   }
 
   /**
-   * 批量强制kill
+   * Batch forced kill
    *
    * @param taskIdList
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to