This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 21a649040 [linkis-entrance] Modification of scala file floating red
(#3243)
21a649040 is described below
commit 21a649040e0a23b8984c3b32d6509e723d9f1baf
Author: 成彬彬 <[email protected]>
AuthorDate: Wed Sep 7 23:46:28 2022 +0800
[linkis-entrance] Modification of scala file floating red (#3243)
* [linkis-entrance] Modification of scala file floating red
---
.../linkis/entrance/EntranceWebSocketService.scala | 38 +++++-----------------
.../cli/heartbeat/CliHeartbeatMonitor.scala | 4 +--
.../linkis/entrance/execute/EntranceExecutor.scala | 1 -
.../entrance/execute/EntranceExecutorManager.scala | 15 ++-------
.../linkis/entrance/execute/EntranceJob.scala | 7 ++--
.../execute/ExecuteRequestInterceptor.scala | 4 +--
.../apache/linkis/entrance/execute/MarkReq.scala | 14 ++------
.../interceptor/impl/CommentInterceptor.scala | 2 ++
.../impl/ShellDangerousGrammerInterceptor.scala | 4 +--
.../linkis/entrance/log/CacheLogManager.scala | 8 +----
.../linkis/entrance/log/ErrorCodeListener.scala | 4 +--
.../apache/linkis/entrance/log/LogManager.scala | 4 +--
.../org/apache/linkis/entrance/log/LogWriter.scala | 3 +-
.../org/apache/linkis/entrance/log/LoopArray.scala | 5 +--
.../linkis/entrance/log/WebSocketLogWriter.scala | 3 +-
.../entrance/parser/CommonEntranceParser.scala | 29 +++++++++--------
.../entrance/scheduler/EntranceGroupFactory.scala | 18 +++++-----
.../scheduler/cache/ReadCacheConsumer.scala | 15 +++++----
.../entrance/timeout/JobTimeoutManager.scala | 22 +++++++------
.../linkis/entrance/utils/JobHistoryHelper.scala | 4 +--
20 files changed, 85 insertions(+), 119 deletions(-)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
index fe44be993..b3abc3d1d 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
@@ -60,9 +60,11 @@ class EntranceWebSocketService
private val websocketTagJobID = new util.HashMap[String, String]()
private val restfulURI =
- if (ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue.endsWith("/"))
+ if (ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue.endsWith("/")) {
ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
- else ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue + "/"
+ } else {
+ ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue + "/"
+ }
private val executePattern = restfulURI + "entrance/execute"
private val logUrlPattern = (restfulURI + """entrance/(.+)/log""").r
@@ -233,8 +235,6 @@ class EntranceWebSocketService
s"Your job is accepted, jobID is ${job.getId} and taskID is
$taskID. Please wait it to be scheduled"
)
)
- // val executeApplicationName:String = jobRequest.getExecuteApplicationName
- // val execID = ZuulEntranceUtils.generateExecID(jobId,
executeApplicationName, Sender.getThisInstance)
"The request was executed successfully!"
.data("execID", execID)
.data("taskID", taskID)
@@ -246,7 +246,7 @@ class EntranceWebSocketService
var retMessage: Message = null
val realID = ZuulEntranceUtils.parseExecID(id)(3)
entranceServer.getJob(realID) foreach {
- case entranceExecutionJob: EntranceExecutionJob => {
+ case entranceExecutionJob: EntranceExecutionJob =>
logger.info(s"begin to get job $realID log via websocket")
val logsArr: Array[String] = new Array[String](4)
entranceExecutionJob.getWebSocketLogReader.foreach(logReader =>
@@ -263,7 +263,6 @@ class EntranceWebSocketService
retMessage.setMethod(restfulURI + "entrance/" + id + "/log")
logger.info(s"end to get job $realID log via websocket")
return retMessage
- }
case _ =>
}
retMessage = Message.error(
@@ -275,11 +274,7 @@ class EntranceWebSocketService
}
def dealStatus(event: ServerEvent, id: String): Message = {
- // val response:Response = entranceRestfulApi.status(id)
- // Message.responseToMessage(response)
var retMessage: Message = null
-
- // val realID:String = if (entranceServer.getJob(id).isDefined) id else
ZuulEntranceUtils.parseExecID(id)(2)
val realID: String = if (!id.contains(":")) id else
ZuulEntranceUtils.parseExecID(id)(3)
entranceServer.getJob(realID) foreach {
case entranceExecutionJob: EntranceExecutionJob =>
@@ -293,8 +288,9 @@ class EntranceWebSocketService
Sender.getThisInstance,
userCreatorLabel.getCreator
)
- if (!jobIdToEventId.containsKey(realID) && event != null)
+ if (!jobIdToEventId.containsKey(realID) && event != null) {
jobIdToEventId synchronized jobIdToEventId.put(realID, event.getId)
+ }
val status = entranceExecutionJob.getState
retMessage = Message.ok("Get the status of the task
successfully(获取任务状态成功)")
@@ -323,12 +319,10 @@ class EntranceWebSocketService
}
def dealProgress(event: ServerEvent, id: String): Message = {
- // val response:Response = entranceRestfulApi.progress(id)
- // Message.responseToMessage(response)
var retMessage: Message = null
val realID = ZuulEntranceUtils.parseExecID(id)(3)
entranceServer.getJob(realID) foreach {
- case entranceExecutionJob: EntranceExecutionJob => {
+ case entranceExecutionJob: EntranceExecutionJob =>
val progress = entranceExecutionJob.getProgress
retMessage = Message.ok("Get the task progress successfully(获取任务进度成功)")
val taskID = entranceExecutionJob.getJobRequest.getId
@@ -340,7 +334,6 @@ class EntranceWebSocketService
retMessage.setStatus(0)
retMessage.setMethod(restfulURI + "entrance/" + id + "/progress")
return retMessage
- }
case _ =>
}
retMessage = Message.error("Get task progress failed(获取任务进度失败)")
@@ -358,8 +351,6 @@ class EntranceWebSocketService
}
def dealKill(event: ServerEvent, id: String): Message = {
- // val response:Response = entranceRestfulApi.kill(id)
- // Message.responseToMessage(response)
var retMessage: Message = null
val realID = ZuulEntranceUtils.parseExecID(id)(3)
entranceServer.getJob(realID) foreach {
@@ -384,19 +375,6 @@ class EntranceWebSocketService
retMessage
}
- /* def dealBackGroundService(event: ServerEvent): Message = {
- val params = event.getData.map{case (k, v) => k -> v.asInstanceOf[Any]}
- val backgroundType = params.get("background").get
- //val backgroundType = "export"
- val backgroundService =
entranceServer.getEntranceContext.getOrCreateBackGroundService.find(f
=>backgroundType.equals(f.serviceType))
- if (backgroundService.isEmpty) {
- info("The corresponding background service was not
found...(未找到相应的后台服务...)")
- dealExecute(event)
- }else{
- dealExecute(backgroundService.get.operation(event))
- }
- }*/
-
private def concatLog(length: Int, log: String, flag: StringBuilder, all:
StringBuilder): Unit = {
if (length == 1) {
flag ++= log ++= "\n"
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
index 851e9fa32..7142a4f6f 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/CliHeartbeatMonitor.scala
@@ -90,9 +90,9 @@ class CliHeartbeatMonitor(handler: HeartbeatLossHandler)
extends Logging {
case entranceJob: EntranceJob =>
if (isCliJob(entranceJob)) {
val id = entranceJob.getJobRequest.getId.toString
- if (!infoMap.containsKey(id))
+ if (!infoMap.containsKey(id)) {
logger.error(s"heartbeat on non-existing job!! job id: $id")
- else infoMap.get(id).updateNewestAccessByClientTimestamp()
+ } else infoMap.get(id).updateNewestAccessByClientTimestamp()
}
case _ =>
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index 429c80f9e..2b80d731a 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -78,7 +78,6 @@ abstract class EntranceExecutor(val id: Long, val mark:
MarkReq) extends Executo
protected def callExecute(request: ExecuteRequest): ExecuteResponse
- // override def toString: String =
s"${getInstance.getApplicationName}Engine($getId, $getUser, $getCreator,
${getInstance.getInstance})"
override def toString: String = "${getId}"
override def getId: Long = this.id
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
index 0f8a4303b..43dd27572 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
@@ -72,10 +72,8 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
}
- // todo 提交任务逻辑调整:将job切分成多条语句,塞到jobGroup队列中。任务提交后,按照队列先后顺序,依次执行任务;
- // 没个子任务运行后,更新整体的Job运行状态
- // 直到所有任务都完毕,或者存在任务异常退出,则结束整体的Job
-
+ // Update the overall job running status after no subtask runs
+ // Until all the tasks are completed, or the task exits abnormally, the
overall job ends
override def askExecutor(schedulerEvent: SchedulerEvent, wait: Duration):
Option[Executor] =
schedulerEvent match {
case job: Job =>
@@ -89,14 +87,7 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
warnException = warn
None
case t: Throwable => throw t
- } /*match {
- case Some(e) => executor = Option(e)
- case _ =>
- if (System.currentTimeMillis - startTime < wait.toMillis) {
- val interval = math.min(3000, wait.toMillis -
System.currentTimeMillis + startTime)
- //getOrCreateEngineManager().waitForIdle(interval)
- }
- }*/
+ }
// todo check
if (warnException != null && executor.isEmpty) throw warnException
executor
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index c0a07380d..9239670ae 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -109,7 +109,7 @@ abstract class EntranceJob extends Job {
}
}
- @Deprecated
+ @deprecated
def incrementResultSetPersisted(): Unit = {
// persistedResultSets.incrementAndGet()
}
@@ -178,8 +178,9 @@ abstract class EntranceJob extends Job {
TaskConstant.ENTRANCEJOB_COMPLETE_TIME,
new Date(System.currentTimeMillis())
)
- if (getJobInfo != null)
+ if (getJobInfo != null) {
getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo(getJobInfo.getMetric)))
+ }
if (isSucceed) {
getLogListener.foreach(
_.onLogUpdate(
@@ -327,6 +328,6 @@ abstract class EntranceJob extends Job {
object EntranceJob {
- def JOB_COMPLETED_PROGRESS = 1.0f
+ def JOB_COMPLETED_PROGRESS: Float = 1.0f
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
index 09b6156ba..151d319cc 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/ExecuteRequestInterceptor.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.entrance.execute
import org.apache.linkis.governance.common.protocol.task.{RequestTask,
RequestTaskExecute}
import org.apache.linkis.scheduler.executer.{ExecuteRequest, JobExecuteRequest}
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
trait ExecuteRequestInterceptor {
@@ -96,7 +96,7 @@ object RuntimePropertiesExecuteRequestInterceptor extends
ExecuteRequestIntercep
override def apply(requestTask: RequestTask, executeRequest:
ExecuteRequest): RequestTask =
executeRequest match {
case runtime: RuntimePropertiesExecuteRequest =>
- JavaConversions.mapAsScalaMap(runtime.properties).foreach { case (k,
v) =>
+ mapAsScalaMapConverter(runtime.properties).asScala.foreach { case (k,
v) =>
requestTask.data(k, v)
}
requestTask
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
index 4e6efcb6f..d04511f29 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/MarkReq.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.entrance.execute
import java.util
import scala.beans.BeanProperty
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
class MarkReq {
@@ -60,20 +60,11 @@ class MarkReq {
return flag
}
- /* if (other.getProperties != null && getProperties != null) {
- val iterator = other.getProperties.iterator
- while (iterator.hasNext) {
- val next = iterator.next()
- if (!next._2.equalsIgnoreCase(getProperties.get(next._1))) {
- return flag
- }
- }
- }*/
if (other.getLabels != null && getLabels != null) {
if (getLabels.size() != other.getLabels.size()) {
return false
}
- val iterator = other.getLabels.iterator
+ val iterator = other.getLabels.asScala.iterator
while (iterator.hasNext) {
val next = iterator.next()
if (null == next._2 || !next._2.equals(getLabels.get(next._1))) {
@@ -86,4 +77,5 @@ class MarkReq {
flag
}
+ override def hashCode(): Int = super.hashCode()
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
index e99c3bfa4..da751a4bb 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CommentInterceptor.scala
@@ -110,7 +110,9 @@ object CommentMain {
def main(args: Array[String]): Unit = {
val sqlCode = "select * from default.user;--你好;show tables"
val sqlCode1 = "select * from default.user--你好;show tables"
+ // scalastyle:off println
println(SQLCommentHelper.dealComment(sqlCode))
+ // scalastyle:on println
}
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
index 567cddfe4..cabbe84b6 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/ShellDangerousGrammerInterceptor.scala
@@ -107,9 +107,7 @@ class ShellDangerousGrammerInterceptor extends
EntranceInterceptor with Logging
logger.info(s"GET REQUEST CODE_TYPE ${codeType} and ENGINE_TYPE
${EngineType}")
if (shellContainDangerUsage(jobRequest.getExecutionCode)) {
throw DangerousGramsCheckException("Shell code contains blacklisted
code(shell中包含黑名单代码)")
- } /*else if (!shellWhiteUsage(jobRequest.getExecutionCode)) {
- throw DangerousGramsCheckException("The shell code is not in the
whitelist code(shell代码不在白名单中)")
- }*/
+ }
jobRequest
} else {
jobRequest
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
index bc068b89e..8b851e402 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala
@@ -73,7 +73,7 @@ class CacheLogManager extends LogManager with Logging {
return null
}
job match {
- case entranceExecutionJob: EntranceExecutionJob => {
+ case entranceExecutionJob: EntranceExecutionJob =>
val cache: Cache =
Cache(EntranceConfiguration.DEFAULT_CACHE_MAX.getValue)
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
val fsLogPath = new FsPath(logPath)
@@ -97,13 +97,7 @@ class CacheLogManager extends LogManager with Logging {
}
entranceExecutionJob.setLogWriter(cacheLogWriter)
logger.info(s"job ${entranceExecutionJob.getJobRequest.getId} create
cacheLogWriter")
- /*val webSocketCacheLogReader: WebSocketCacheLogReader =
- new WebSocketCacheLogReader(logPath,
EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache,
entranceExecutionJob.getUser)
- entranceExecutionJob.setWebSocketLogReader(webSocketCacheLogReader)
- val webSocketLogWriter: WebSocketLogWriter = new
WebSocketLogWriter(entranceExecutionJob,
entranceContext.getOrCreateLogListenerBus)
- entranceExecutionJob.setWebSocketLogWriter(webSocketLogWriter)*/
cacheLogWriter
- }
case _ => null
}
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
index 3bd93465c..2be792e05 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/ErrorCodeListener.scala
@@ -39,11 +39,11 @@ class PersistenceErrorCodeListener extends
ErrorCodeListener {
def setPersistenceManager(persistenceManager: PersistenceManager): Unit =
this.persistenceManager = persistenceManager
- def getPersistenceManager = persistenceManager
+ def getPersistenceManager: PersistenceManager = persistenceManager
def setEntranceParser(entranceParser: EntranceParser): Unit =
this.entranceParser = entranceParser
- def getEntranceParser = entranceParser
+ def getEntranceParser: EntranceParser = entranceParser
/**
* onErrorCodeCreated: When a job is running, it terminates unexpectedly or
generates an error,
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
index e1c5c1988..626a643a0 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala
@@ -47,10 +47,9 @@ abstract class LogManager extends LogListener with Logging {
def dealLogEvent(job: Job, log: String): Unit = {
Utils.tryCatch {
- // warn(s"jobid :${job.getId()}\nlog : ${log}")
job match {
case entranceExecutionJob: EntranceExecutionJob =>
- if (entranceExecutionJob.getLogWriter.isEmpty)
+ if (entranceExecutionJob.getLogWriter.isEmpty) {
entranceExecutionJob.getLogWriterLocker synchronized {
if (entranceExecutionJob.getLogWriter.isEmpty) {
val logWriter = createLogWriter(entranceExecutionJob)
@@ -59,6 +58,7 @@ abstract class LogManager extends LogListener with Logging {
}
}
}
+ }
entranceExecutionJob.getLogWriter.foreach(logWriter =>
logWriter.write(log))
errorCodeManager.foreach(_.errorMatch(log).foreach { case (code,
errorMsg) =>
errorCodeListener.foreach(_.onErrorCodeCreated(job, code,
errorMsg))
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
index 0b783c47c..f730abe50 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala
@@ -74,8 +74,9 @@ abstract class LogWriter(charset: String) extends Closeable
with Flushable with
abstract class AbstractLogWriter(logPath: String, user: String, charset:
String)
extends LogWriter(charset) {
- if (StringUtils.isBlank(logPath))
+ if (StringUtils.isBlank(logPath)) {
throw new EntranceErrorException(20301, "logPath cannot be empty.")
+ }
protected var fileSystem =
FSFactory.getFsByProxyUser(new FsPath(logPath),
user).asInstanceOf[FileSystem]
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
index 59edd0731..155d8c7bd 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
@@ -44,12 +44,13 @@ class LoopArray[T](maxCapacity: Int) {
def get(index: Int): T = eventQueue synchronized {
val _max = max
- if (index < realSize)
+ if (index < realSize) {
throw new IllegalArgumentException(
"The index " + index + " has already been deleted, now index must be
bigger than " + realSize
)
- else if (index > _max)
+ } else if (index > _max) {
throw new IllegalArgumentException("The index " + index + " must be less
than " + _max)
+ }
val _index = (flag + (index - realSize)) % maxCapacity
eventQueue(_index).asInstanceOf[T]
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
index 113a2b7e9..a2aeea691 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/WebSocketLogWriter.scala
@@ -33,7 +33,8 @@ class WebSocketLogWriter(
entranceLogListenerBus: EntranceLogListenerBus[EntranceLogListener,
EntranceLogEvent]
) {
- def write(msg: String): Unit = if (entranceLogListenerBus != null)
+ def write(msg: String): Unit = if (entranceLogListenerBus != null) {
entranceLogListenerBus.post(EntrancePushLogEvent(job, msg))
+ }
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 583f110d5..c145f2319 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -39,7 +39,6 @@ import org.apache.commons.lang3.StringUtils
import java.util
import java.util.Date
-import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.JavaConverters._
class CommonEntranceParser(val persistenceManager: PersistenceManager)
@@ -95,8 +94,9 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
if (executionContent.containsKey(TaskConstant.CODE)) {
code = executionContent.get(TaskConstant.CODE).asInstanceOf[String]
runType = executionContent.get(TaskConstant.RUNTYPE).asInstanceOf[String]
- if (StringUtils.isEmpty(code))
+ if (StringUtils.isEmpty(code)) {
throw new EntranceIllegalParamException(20007, "param executionCode
can not be empty ")
+ }
} else {
// todo check
throw new EntranceIllegalParamException(20010, "Only code with runtype
supported !")
@@ -114,7 +114,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
jobRequest.setLabels(new util.ArrayList[Label[_]](labels.values()))
jobRequest.setSource(source)
jobRequest.setStatus(SchedulerEventState.Inited.toString)
- // Entrance指标:任务提交时间
+ // Entry indicator: task submission time
jobRequest.setMetrics(new util.HashMap[String, Object]())
jobRequest.getMetrics.put(
TaskConstant.ENTRANCEJOB_SUBMIT_TIME,
@@ -125,7 +125,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
}
private def checkEngineTypeLabel(labels: util.Map[String, Label[_]]): Unit =
{
- val engineTypeLabel = labels.getOrElse(LabelKeyConstant.ENGINE_TYPE_KEY,
null)
+ val engineTypeLabel =
labels.asScala.getOrElse(LabelKeyConstant.ENGINE_TYPE_KEY, null)
if (null == engineTypeLabel) {
val msg = s"You need to specify engineTypeLabel in labels, such as
spark-2.4.3"
throw new EntranceIllegalParamException(
@@ -145,7 +145,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
runType: String,
labels: util.Map[String, Label[_]]
): Unit = {
- val engineRunTypeLabel = labels.getOrElse(LabelKeyConstant.CODE_TYPE_KEY,
null)
+ val engineRunTypeLabel =
labels.asScala.getOrElse(LabelKeyConstant.CODE_TYPE_KEY, null)
if (StringUtils.isBlank(runType) && null == engineRunTypeLabel) {
val msg = s"You need to specify runType in execution content, such as
sql"
logger.warn(msg)
@@ -171,7 +171,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
executeUser: String,
labels: util.Map[String, Label[_]]
): Unit = {
- var userCreatorLabel = labels
+ var userCreatorLabel = labels.asScala
.getOrElse(LabelKeyConstant.USER_CREATOR_TYPE_KEY, null)
.asInstanceOf[UserCreatorLabel]
if (null == userCreatorLabel) {
@@ -193,8 +193,9 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
if (StringUtils.isBlank(submitUser)) {
jobReq.setSubmitUser(umUser)
}
- if (umUser == null)
+ if (umUser == null) {
throw new EntranceIllegalParamException(20005, "execute user can not be
null")
+ }
jobReq.setExecuteUser(umUser)
var executionCode =
params.get(TaskConstant.EXECUTIONCODE).asInstanceOf[String]
val _params = params.get(TaskConstant.PARAMS)
@@ -209,21 +210,21 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
.asInstanceOf[util.Map[String, String]]
val executeApplicationName =
params.get(TaskConstant.EXECUTEAPPLICATIONNAME).asInstanceOf[String]
- if (StringUtils.isEmpty(creator))
+ if (StringUtils.isEmpty(creator)) {
creator = EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue
- // if (StringUtils.isEmpty(executeApplicationName)) throw new
EntranceIllegalParamException(20006, "param executeApplicationName can not be
empty or null")
- /* When the execution type is IDE, executionCode and scriptPath cannot be
empty at the same time*/
- /*当执行类型为IDE的时候,executionCode和scriptPath不能同时为空*/
+ }
+ // When the execution type is IDE, executioncode and scriptpath cannot be
empty at the same time
if (
EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue.equals(
creator
) && StringUtils.isEmpty(source.get(TaskConstant.SCRIPTPATH)) &&
StringUtils.isEmpty(executionCode)
- )
+ ) {
throw new EntranceIllegalParamException(
20007,
"param executionCode and scriptPath can not be empty at the same time"
)
+ }
var runType: String = null
if (StringUtils.isNotEmpty(executionCode)) {
runType = params.get(TaskConstant.RUNTYPE).asInstanceOf[String]
@@ -254,9 +255,9 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
}
jobReq.setProgress("0.0")
jobReq.setSource(source)
- // 为了兼容代码,让engineType和runType都有同一个属性
+ // In order to be compatible with the code, let enginetype and runtype
have the same attribute
jobReq.setStatus(SchedulerEventState.Inited.toString)
- // 封装Labels
+ // Package labels
jobReq.setLabels(labelList)
jobReq.setMetrics(new util.HashMap[String, Object]())
jobReq.getMetrics.put(TaskConstant.ENTRANCEJOB_SUBMIT_TIME, new
Date(System.currentTimeMillis))
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
index b680f7537..1ffb3ffa2 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
@@ -45,7 +45,7 @@ import java.util
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.cache.{Cache, CacheBuilder}
@@ -188,8 +188,9 @@ object EntranceGroupFactory {
runtime.get(TaskConstant.READ_FROM_CACHE) != null && runtime
.get(TaskConstant.READ_FROM_CACHE)
.asInstanceOf[Boolean]
- ) CACHE
- else ""
+ ) {
+ CACHE
+ } else ""
if (StringUtils.isNotEmpty(creator)) creator + "_" + user + cache
else EntranceConfiguration.DEFAULT_REQUEST_APPLICATION_NAME.getValue + "_"
+ user + cache
}
@@ -199,9 +200,9 @@ object EntranceGroupFactory {
params: util.Map[String, Any] = new util.HashMap[String, Any]
): String = {
- val userCreator = labels.find(_.isInstanceOf[UserCreatorLabel])
- val engineType = labels.find(_.isInstanceOf[EngineTypeLabel])
- val concurrent = labels.find(_.isInstanceOf[ConcurrentEngineConnLabel])
+ val userCreator = labels.asScala.find(_.isInstanceOf[UserCreatorLabel])
+ val engineType = labels.asScala.find(_.isInstanceOf[EngineTypeLabel])
+ val concurrent =
labels.asScala.find(_.isInstanceOf[ConcurrentEngineConnLabel])
if (userCreator.isEmpty || engineType.isEmpty) {
throw new EntranceErrorException(20001, "userCreator label or engineType
label cannot null")
}
@@ -223,8 +224,9 @@ object EntranceGroupFactory {
runtime.get(TaskConstant.READ_FROM_CACHE) != null && runtime
.get(TaskConstant.READ_FROM_CACHE)
.asInstanceOf[Boolean]
- ) CACHE
- else ""
+ ) {
+ CACHE
+ } else ""
val groupName =
userCreatorLabel.getCreator + "_" + userCreatorLabel.getUser + "_" +
engineTypeLabel.getEngineType + cache
groupName
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
index 881b5a669..6c1ec4fc2 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/cache/ReadCacheConsumer.scala
@@ -41,7 +41,7 @@ import org.apache.commons.lang3.StringUtils
import java.util.concurrent.ExecutorService
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.collect.Lists
@@ -59,11 +59,13 @@ class ReadCacheConsumer(
job.getJobRequest match {
case jobRequest: JobRequest =>
Utils.tryCatch {
- val engineTpyeLabel = jobRequest.getLabels
+ val engineTpyeLabel = jobRequest.getLabels.asScala
.filter(l =>
l.getLabelKey.equalsIgnoreCase(LabelKeyConstant.ENGINE_TYPE_KEY))
.headOption
.getOrElse(null)
- val labelStrList = jobRequest.getLabels.map { case l =>
l.getStringValue }.toList
+ val labelStrList = jobRequest.getLabels.asScala.map { case l =>
+ l.getStringValue
+ }.toList
if (null == engineTpyeLabel) {
logger.error(
"Invalid engineType null, cannot process. jobReq : " +
BDPJettyServerHelper.gson
@@ -78,13 +80,13 @@ class ReadCacheConsumer(
val cacheResult = JobHistoryHelper.getCache(
jobRequest.getExecutionCode,
jobRequest.getExecuteUser,
- labelStrList,
+ labelStrList.asJava,
readCacheBefore
)
if (cacheResult != null &&
StringUtils.isNotBlank(cacheResult.getResultLocation)) {
val resultSets = listResults(cacheResult.getResultLocation,
job.getUser)
if (resultSets.size() > 0) {
- for (resultSet: FsPath <- resultSets) {
+ for (resultSet: FsPath <- resultSets.asScala) {
val alias = FilenameUtils.getBaseName(resultSet.getPath)
val output = FsPath
.getFsPath(
@@ -137,11 +139,12 @@ class ReadCacheConsumer(
val consumer =
schedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(groupName)
val index = consumer.getConsumeQueue.offer(job)
// index.map(getEventId(_, groupName)).foreach(job.setId)
- if (index.isEmpty)
+ if (index.isEmpty) {
throw new SchedulerErrorException(
12001,
"The submission job failed and the queue is full!(提交作业失败,队列已满!)"
)
+ }
}
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
index b50a372dc..af8288d42 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/timeout/JobTimeoutManager.scala
@@ -31,7 +31,7 @@ import org.apache.linkis.manager.label.entity.entrance.{
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
class JobTimeoutManager extends Logging {
@@ -43,9 +43,9 @@ class JobTimeoutManager extends Logging {
def add(jobKey: String, job: EntranceJob): Unit = {
logger.info(s"Adding timeout job: ${job.getId()}")
- if (!timeoutJobByName.contains(jobKey)) {
+ if (!timeoutJobByName.asScala.contains(jobKey)) {
synchronized {
- if (!timeoutJobByName.contains(jobKey)) {
+ if (!timeoutJobByName.asScala.contains(jobKey)) {
timeoutJobByName.put(jobKey, job)
}
}
@@ -65,7 +65,7 @@ class JobTimeoutManager extends Logging {
}
def jobExist(jobKey: String): Boolean = {
- timeoutJobByName.contains(jobKey)
+ timeoutJobByName.asScala.contains(jobKey)
}
def jobCompleteDelete(jobkey: String): Unit = {
@@ -89,7 +89,7 @@ class JobTimeoutManager extends Logging {
if (job.getStartTime > 0) job.getStartTime / 1000 else
currentTimeSeconds
val runningTimeSeconds = currentTimeSeconds -
jobRunningStartTimeSeconds
if (!job.isCompleted) {
- job.jobRequest.getLabels foreach {
+ job.jobRequest.getLabels.asScala foreach {
case queueTimeOutLabel: JobQueuingTimeoutLabel =>
if (
job.isWaiting && queueTimeOutLabel.getQueuingTimeout > 0 &&
queuingTimeSeconds >= queueTimeOutLabel.getQueuingTimeout
@@ -119,7 +119,7 @@ class JobTimeoutManager extends Logging {
}
}
- timeoutJobByName.foreach(item => {
+ timeoutJobByName.asScala.foreach(item => {
logger.info(s"Running timeout detection!")
synchronized {
jobCompleteDelete(item._1)
@@ -129,7 +129,7 @@ class JobTimeoutManager extends Logging {
}
}
- // 线程周期性扫描超时任务
+ // Thread periodic scan timeout task
val woker = Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable() {
@@ -153,8 +153,10 @@ object JobTimeoutManager {
// If the timeout label set by the user is invalid, execution is not allowed
def checkTimeoutLabel(labels: util.Map[String, Label[_]]): Unit = {
- val jobQueuingTimeoutLabel =
labels.getOrElse(LabelKeyConstant.JOB_QUEUING_TIMEOUT_KEY, null)
- val jobRunningTimeoutLabel =
labels.getOrElse(LabelKeyConstant.JOB_RUNNING_TIMEOUT_KEY, null)
+ val jobQueuingTimeoutLabel =
+ labels.asScala.getOrElse(LabelKeyConstant.JOB_QUEUING_TIMEOUT_KEY, null)
+ val jobRunningTimeoutLabel =
+ labels.asScala.getOrElse(LabelKeyConstant.JOB_RUNNING_TIMEOUT_KEY, null)
val posNumPattern = "^[0-9]+$"
if (
(null != jobQueuingTimeoutLabel &&
!jobQueuingTimeoutLabel.getStringValue.matches(
@@ -174,7 +176,7 @@ object JobTimeoutManager {
def hasTimeoutLabel(entranceJob: EntranceJob): Boolean = {
val labels = entranceJob.jobRequest.getLabels
- labels.exists(label =>
+ labels.asScala.exists(label =>
label.getLabelKey == LabelKeyConstant.JOB_QUEUING_TIMEOUT_KEY ||
label.getLabelKey == LabelKeyConstant.JOB_RUNNING_TIMEOUT_KEY
)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index 0345120e2..d912e7110 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -91,7 +91,7 @@ object JobHistoryHelper extends Logging {
}
/**
- * 对于一个在内存中找不到这个任务的话,可以直接干掉
+ * If the task cannot be found in memory, you can directly kill it
*
* @param taskID
*/
@@ -106,7 +106,7 @@ object JobHistoryHelper extends Logging {
}
/**
- * 批量强制kill
+ * Batch forced kill
*
* @param taskIdList
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]