This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit dd782c7d4e4e54c26bf6d4680ff844e78c274f86
Author: peacewong <[email protected]>
AuthorDate: Tue Mar 19 22:01:03 2024 +0800

    Relieve memory usage
---
 .../linkis/scheduler/conf/SchedulerConfiguration.scala      |  4 ++--
 .../main/scala/org/apache/linkis/scheduler/queue/Job.scala  | 13 ++++++++++---
 .../linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala |  2 ++
 .../queue/parallelqueue/ParallelConsumerManager.scala       |  4 ++--
 4 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
index 8fd6f1c6f0..c2829052ac 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
@@ -25,10 +25,10 @@ object SchedulerConfiguration {
     CommonVars("wds.linkis.fifo.consumer.auto.clear.enabled", true)
 
   val FIFO_CONSUMER_MAX_IDLE_TIME =
-    CommonVars("wds.linkis.fifo.consumer.max.idle.time", new 
TimeType("1h")).getValue.toLong
+    CommonVars("wds.linkis.fifo.consumer.max.idle.time", new 
TimeType("10m")).getValue.toLong
 
   val FIFO_CONSUMER_IDLE_SCAN_INTERVAL =
-    CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new 
TimeType("2h"))
+    CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new 
TimeType("30m"))
 
   val FIFO_CONSUMER_IDLE_SCAN_INIT_TIME =
     CommonVars("wds.linkis.fifo.consumer.idle.scan.init.time", new 
TimeType("1s"))
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
index 2087153813..7534f74071 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
@@ -210,9 +210,6 @@ abstract class Job extends Runnable with SchedulerEvent 
with Closeable with Logg
     case _ =>
       jobDaemon.foreach(_.kill())
       jobListener.foreach(_.onJobCompleted(this))
-//      if(getJobInfo != null) logListener.foreach(_.onLogUpdate(this, 
getJobInfo.getMetric))
-      logListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("job is 
completed.")))
-    // TODO job end event
   }
 
   protected def transitionCompleted(executeCompleted: 
CompletedExecuteResponse): Unit = {
@@ -351,6 +348,16 @@ abstract class Job extends Runnable with SchedulerEvent 
with Closeable with Logg
   }
 
   override def toString: String = if (StringUtils.isNotBlank(getName)) getName 
else getId
+
+  /**
+   * clear job memory
+   */
+  def clear(): Unit = {
+    logger.info(s" clear job base info $getId")
+    this.executor = null
+    this.jobDaemon = null
+  }
+
 }
 
 /**
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
index fcab44a731..df296f8329 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
@@ -232,6 +232,8 @@ class FIFOUserConsumer(
         case _ =>
       }
     }
+    // clear cache
+    queue.clearAll()
 
     this.runningJobs.foreach { job =>
       if (job != null && !job.isCompleted) {
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
index 396b6fb315..1e753ea866 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
@@ -126,8 +126,8 @@ class ParallelConsumerManager(maxParallelismUsers: Int, 
schedulerName: String)
 
   override def destroyConsumer(groupName: String): Unit =
     consumerGroupMap.get(groupName).foreach { tmpConsumer =>
-      tmpConsumer.shutdown()
-      consumerGroupMap.remove(groupName)
+      Utils.tryAndWarn(tmpConsumer.shutdown())
+      Utils.tryAndWarn(consumerGroupMap.remove(groupName))
       consumerListener.foreach(_.onConsumerDestroyed(tmpConsumer))
       logger.warn(s"Consumer of group ($groupName) in $schedulerName is 
destroyed.")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to