This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 50254ac558e1ab26321a67411b6baee402dece57 Author: peacewong <[email protected]> AuthorDate: Fri Sep 22 14:55:54 2023 +0800 optimize consumer code logic --- .../apache/linkis/scheduler/queue/Consumer.scala | 1 - .../linkis/scheduler/queue/GroupFactory.scala | 5 +++ .../linkis/scheduler/queue/LoopArrayQueue.scala | 7 +++- .../queue/fifoqueue/FIFOUserConsumer.scala | 45 +++++++++++++++++++--- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala index 50dce2ca1..165a27436 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala @@ -41,7 +41,6 @@ abstract class Consumer(schedulerContext: SchedulerContext, executeService: Exec def start(): Unit def shutdown(): Unit = { - logger.info(s"$toString is ready to stop!") terminate = true logger.info(s"$toString stopped!") } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala index f3471b07d..be1716f23 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala @@ -19,6 +19,11 @@ package org.apache.linkis.scheduler.queue abstract class GroupFactory { + /** + * Create a Group and set the concurrency limit of the group + * @param event + * @return + */ def getOrCreateGroup(event: SchedulerEvent): Group def getGroup(groupName: String): Group diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala index b0bbfd3c2..8bea7e52b 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala @@ -40,7 +40,12 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging { override def getWaitingEvents: Array[SchedulerEvent] = { eventQueue synchronized { - toIndexedSeq.filter(x => x.getState.equals(SchedulerEventState.Inited)).toArray + toIndexedSeq + .filter(x => + x.getState.equals(SchedulerEventState.Inited) || x.getState + .equals(SchedulerEventState.Scheduled) + ) + .toArray } } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala index 2a40c2517..d541d8a2e 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala @@ -73,6 +73,8 @@ class FIFOUserConsumer( override def getRunningEvents: Array[SchedulerEvent] = getEvents(e => e.isRunning || e.isWaitForRetry) + protected def getSchedulerContext: SchedulerContext = schedulerContext + private def getEvents(op: SchedulerEvent => Boolean): Array[SchedulerEvent] = { val result = ArrayBuffer[SchedulerEvent]() runningJobs.filter(_ != null).filter(x => op(x)).foreach(result += _) @@ -82,16 +84,28 @@ class FIFOUserConsumer( override def run(): Unit = { Thread.currentThread().setName(s"${toString}Thread") logger.info(s"$toString thread started!") - while (!terminate) { - Utils.tryAndError(loop()) - Utils.tryAndError(Thread.sleep(10)) + while (!terminate) Utils.tryAndError { + loop() + Thread.sleep(10) } logger.info(s"$toString thread stopped!") } protected def askExecutorGap(): Unit = {} + /** + * Task scheduling interception is used to judge the rules of task operation, and to judge other + * task rules based on Group. For example, Entrance makes Creator-level task judgment. + */ + protected def runScheduleIntercept(): Boolean = { + true + } + protected def loop(): Unit = { + if (!runScheduleIntercept()) { + Utils.tryQuietly(Thread.sleep(1000)) + return + } var isRetryJob = false def getWaitForRetryEvent: Option[SchedulerEvent] = { val waitForRetryJobs = runningJobs.filter(job => job != null && job.isJobCanRetry) @@ -110,7 +124,7 @@ class FIFOUserConsumer( if (event.isEmpty) { val completedNums = runningJobs.filter(job => job == null || job.isCompleted) if (completedNums.length < 1) { - Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化 + Utils.tryQuietly(Thread.sleep(1000)) return } while (event.isEmpty) { @@ -119,7 +133,12 @@ class FIFOUserConsumer( if ( takeEvent.exists(e => Utils.tryCatch(e.turnToScheduled()) { t => - takeEvent.get.asInstanceOf[Job].onFailure("Job状态翻转为Scheduled失败!", t) + takeEvent.get + .asInstanceOf[Job] + .onFailure( + "Failed to change the job status to Scheduled(Job状态翻转为Scheduled失败)", + t + ) false } ) @@ -174,7 +193,7 @@ class FIFOUserConsumer( ) ) case error: Throwable => - job.onFailure("请求引擎失败,可能是由于后台进程错误!请联系管理员", error) + job.onFailure("Failed to request EngineConn", error) if (job.isWaitForRetry) { logger.warn(s"Ask executor for Job $job failed, wait for the next retry!", error) if (!isRetryJob) putToRunningJobs(job) @@ -190,6 +209,20 @@ class FIFOUserConsumer( override def shutdown(): Unit = { future.cancel(true) + val waitEvents = queue.getWaitingEvents + if (waitEvents.nonEmpty) { + waitEvents.foreach { + case job: Job => + job.onFailure("Your job will be marked as canceled because the consumer be killed", null) + case _ => + } + } + + this.runningJobs.foreach { job => + if (job != null && !job.isCompleted) { + job.onFailure("Your job will be marked as canceled because the consumer be killed", null) + } + } super.shutdown() } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
