This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 02a0d4633 [Feature][EngineConn][Shell] Shell EC supports concurrent 
execution (#3903)
02a0d4633 is described below

commit 02a0d46334783f1be7ce4038b0091741446c7eb5
Author: peacewong <[email protected]>
AuthorDate: Wed Nov 30 22:22:27 2022 +0800

    [Feature][EngineConn][Shell] Shell EC supports concurrent execution (#3903)
    
    * shell ec Support concurrent mode
    
    * Increase machine performance monitoring
---
 linkis-commons/linkis-common/pom.xml               |   7 ++
 .../linkis/common/utils}/HardwareUtils.scala       |  51 ++++++++--
 .../linkis/common/utils/HardwareUtilsTest.scala    |  49 ++++++++++
 .../linkis-engineconn-manager-server/pom.xml       |   6 --
 .../service/impl/DefaultECMHealthService.scala     |   5 +-
 .../concurrent/monitor/HardwareMonitorService.java |  71 ++++++++++++++
 .../concurrent/monitor/MonitorService.java}        |  32 +-----
 .../concurrent/monitor/TaskMonitorService.java     |  67 +++++++++++++
 .../concurrent/monitor/TimingMonitorService.java   | 107 +++++++++++++++++++++
 .../execute/ConcurrentComputationExecutor.scala    |  27 ------
 .../conf/AccessibleExecutorConfiguration.scala     |   4 +-
 .../AccessibleExecutorSpringConfiguration.scala    |   2 +-
 .../service/EngineConnTimedLockService.scala       |   2 +-
 .../shell/conf/ShellEngineConnConf.scala           |  33 ++-----
 .../engineplugin/shell/executor/ReaderThread.scala |  21 ++--
 .../shell/executor/ShellECTaskInfo.scala           |  33 +------
 ...ala => ShellEngineConnConcurrentExecutor.scala} |  95 +++++++++++-------
 .../shell/executor/ShellEngineConnExecutor.scala   |  21 ++--
 .../shell/executor/YarnAppIdExtractor.scala        |  87 +++--------------
 .../shell/factory/ShellEngineConnFactory.scala     |  19 +++-
 .../cache/impl/QueryCacheServiceImpl.java          |  64 ------------
 21 files changed, 468 insertions(+), 335 deletions(-)

diff --git a/linkis-commons/linkis-common/pom.xml 
b/linkis-commons/linkis-common/pom.xml
index ad8e53fe2..b71113c2a 100644
--- a/linkis-commons/linkis-common/pom.xml
+++ b/linkis-commons/linkis-common/pom.xml
@@ -146,6 +146,13 @@
       <version>1.1.2</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.github.oshi</groupId>
+      <artifactId>oshi-core</artifactId>
+      <version>6.2.1</version>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
similarity index 52%
copy from 
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
copy to 
linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
index 77db299ca..f87a6f94c 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++ 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
@@ -15,35 +15,72 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.common.utils
+
+import java.math.RoundingMode
+import java.text.DecimalFormat
 
 import oshi.SystemInfo
 
 object HardwareUtils {
 
+  private val systemInfo = new SystemInfo
+
+  private val hardware = systemInfo.getHardware
+
+  private val THREE_DECIMAL = "0.000"
+
   def getAvailableMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
     val globalMemory = hardware.getMemory
     globalMemory.getAvailable
   }
 
   def getMaxMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
     val globalMemory = hardware.getMemory
     globalMemory.getTotal
   }
 
   /**
    * 1 total 2 available
+   *
    * @return
    */
   def getTotalAndAvailableMemory(): (Long, Long) = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
     val globalMemory = hardware.getMemory
     (globalMemory.getTotal, globalMemory.getAvailable)
   }
 
+  /**
+   * Get memory usage percentage Keep 3 decimal
+   *
+   * @return
+   *   percent
+   */
+  def memoryUsage(): Double = {
+    val memory = hardware.getMemory
+    val memoryUsage = (memory.getTotal - memory.getAvailable) * 1.0 / 
memory.getTotal
+    val df = new DecimalFormat(THREE_DECIMAL)
+    df.setRoundingMode(RoundingMode.HALF_UP)
+    df.format(memoryUsage).toDouble
+  }
+
+  /**
+   * load average
+   *
+   * @return
+   *   percent
+   */
+  def loadAverageUsage(): Double = {
+    val loadAverage = Utils.tryCatch {
+      OverloadUtils.getOSBean.getSystemLoadAverage
+    } { case e: Exception =>
+      val loadAverage = hardware.getProcessor.getSystemLoadAverage(1)(0)
+      if (loadAverage.isNaN) -1 else loadAverage
+    }
+
+    val df = new DecimalFormat(THREE_DECIMAL)
+    df.setRoundingMode(RoundingMode.HALF_UP)
+    if (loadAverage <= 0) 0 else df.format(loadAverage / 100d).toDouble
+  }
+
 }
diff --git 
a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
 
b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
new file mode 100644
index 000000000..f6ba4de6f
--- /dev/null
+++ 
b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.utils
+
+import org.junit.jupiter.api.Test
+
+class HardwareUtilsTest {
+
+  @Test private[utils] def testGetAvailableMemory() = {
+    val availableMemory = HardwareUtils.getAvailableMemory()
+    assert(availableMemory > 1000L)
+  }
+
+  @Test private[utils] def testGetMaxMemory() = {
+    val maxMemory = HardwareUtils.getMaxMemory()
+    assert(maxMemory > 1000L)
+  }
+
+  @Test private[utils] def testGetTotalAndAvailableMemory() = {
+    val (maxMemory, availableMemory) = 
HardwareUtils.getTotalAndAvailableMemory()
+    assert(maxMemory >= availableMemory)
+  }
+
+  @Test private[utils] def testMemoryUsage() = {
+    val usage = HardwareUtils.memoryUsage()
+    assert(usage >= 0.000)
+  }
+
+  @Test private[utils] def testloadAverageUsage() = {
+    val usage = HardwareUtils.loadAverageUsage()
+    assert(usage >= 0.000)
+  }
+
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
index 3ac7a4935..07444de68 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
@@ -80,12 +80,6 @@
       <scope>provided</scope>
     </dependency>
 
-    <dependency>
-      <groupId>com.github.oshi</groupId>
-      <artifactId>oshi-core</artifactId>
-      <version>6.2.1</version>
-    </dependency>
-
   </dependencies>
 
   <build>
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
index c7fde661d..f6c9c4cb4 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
@@ -17,7 +17,7 @@
 
 package org.apache.linkis.ecm.server.service.impl
 
-import org.apache.linkis.common.utils.{ByteTimeUtils, OverloadUtils, Utils}
+import org.apache.linkis.common.utils.{ByteTimeUtils, HardwareUtils, 
OverloadUtils, Utils}
 import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
 import org.apache.linkis.ecm.core.report.ECMHealthReport
 import org.apache.linkis.ecm.server.LinkisECMApplication
@@ -26,7 +26,6 @@ import org.apache.linkis.ecm.server.conf.ECMConfiguration._
 import org.apache.linkis.ecm.server.listener.{ECMClosedEvent, ECMReadyEvent}
 import org.apache.linkis.ecm.server.report.DefaultECMHealthReport
 import org.apache.linkis.ecm.server.service.{ECMHealthService, 
EngineConnListService}
-import org.apache.linkis.ecm.server.util.HardwareUtils
 import org.apache.linkis.manager.common.entity.enumeration.{NodeHealthy, 
NodeStatus}
 import org.apache.linkis.manager.common.entity.metrics.{NodeHealthyInfo, 
NodeOverLoadInfo}
 import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, 
LoadInstanceResource}
@@ -54,8 +53,6 @@ class DefaultECMHealthService extends ECMHealthService with 
ECMEventListener {
   private val minResource =
     new LoadInstanceResource(ECM_PROTECTED_MEMORY, ECM_PROTECTED_CORES, 
ECM_PROTECTED_INSTANCES)
 
-  private val runtime: Runtime = Runtime.getRuntime
-
   private var status: NodeStatus = NodeStatus.Starting
 
   private var healthy: NodeHealthy = NodeHealthy.Healthy
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/HardwareMonitorService.java
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/HardwareMonitorService.java
new file mode 100644
index 000000000..20e3946f2
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/HardwareMonitorService.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.utils.HardwareUtils;
+
+import org.springframework.stereotype.Component;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class HardwareMonitorService implements MonitorService {
+
+  private static Logger LOG = 
LoggerFactory.getLogger(HardwareMonitorService.class);
+
+  private static CommonVars<Double> MEMORY_MAX_USAGE =
+      CommonVars.apply("linkis.engineconn.concurrent.max.memory.usage", 0.95);
+
+  private static CommonVars<Double> CPU_MAX_USAGE =
+      CommonVars.apply("linkis.engineconn.concurrent.max.cpu.usage", 0.99);
+
+  private Double lastLoadAverageUsage = 0D;
+
+  private Double lastMemoryUsage = 0D;
+
+  @Override
+  public boolean isAvailable() {
+
+    double memoryUsage = HardwareUtils.memoryUsage();
+
+    double loadAverageUsage = HardwareUtils.memoryUsage();
+
+    Double maxMemoryUsage = MEMORY_MAX_USAGE.getValue();
+    Double maxCpuUsage = CPU_MAX_USAGE.getValue();
+
+    boolean isUnavailable =
+        (memoryUsage > maxMemoryUsage && lastMemoryUsage > maxMemoryUsage)
+            || (loadAverageUsage > maxCpuUsage && lastLoadAverageUsage > 
maxCpuUsage);
+    if (isUnavailable) {
+      LOG.warn(
+          "current load average {} is too high or memory usage {} is too high, 
over max cpu load avg={} and memory usage {}",
+          loadAverageUsage,
+          memoryUsage,
+          maxCpuUsage,
+          maxMemoryUsage);
+    }
+
+    lastLoadAverageUsage = loadAverageUsage;
+
+    lastMemoryUsage = memoryUsage;
+
+    return !isUnavailable;
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/MonitorService.java
similarity index 51%
copy from 
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
copy to 
linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/MonitorService.java
index 77db299ca..ce39f3600 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/MonitorService.java
@@ -15,35 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
 
-import oshi.SystemInfo
-
-object HardwareUtils {
-
-  def getAvailableMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    globalMemory.getAvailable
-  }
-
-  def getMaxMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    globalMemory.getTotal
-  }
-
-  /**
-   * 1 total 2 available
-   * @return
-   */
-  def getTotalAndAvailableMemory(): (Long, Long) = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    (globalMemory.getTotal, globalMemory.getAvailable)
-  }
+public interface MonitorService {
 
+  boolean isAvailable();
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
new file mode 100644
index 000000000..591a9792f
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
+
+import 
org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineconn.core.executor.ExecutorManager$;
+import org.apache.linkis.engineconn.executor.entity.Executor;
+
+import org.springframework.stereotype.Component;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class TaskMonitorService implements MonitorService {
+
+  private static Logger LOG = 
LoggerFactory.getLogger(HardwareMonitorService.class);
+
+  private static ConcurrentComputationExecutor concurrentExecutor = null;
+
+  @Override
+  public boolean isAvailable() {
+
+    if (!EngineConnObject.isReady()) {
+      return true;
+    }
+
+    try {
+      if (null == concurrentExecutor) {
+        Executor executor = 
ExecutorManager$.MODULE$.getInstance().getReportExecutor();
+        if (executor instanceof ConcurrentComputationExecutor) {
+          concurrentExecutor = (ConcurrentComputationExecutor) executor;
+        }
+      }
+      if (null == concurrentExecutor) {
+        LOG.warn("shell executor can not is null");
+        return true;
+      }
+      if (concurrentExecutor.getRunningTask() > 
concurrentExecutor.getConcurrentLimit()) {
+        LOG.info(
+            "running task({}) > concurrent limit ({}) , now to mark ec to busy 
",
+            concurrentExecutor.getRunningTask(),
+            concurrentExecutor.getConcurrentLimit());
+        return false;
+      }
+    } catch (Exception e) {
+      LOG.warn("Task Monitor failed", e);
+    }
+    return true;
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
new file mode 100644
index 000000000..de6bb440d
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.conf.TimeType;
+import org.apache.linkis.common.utils.Utils;
+import 
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration;
+import 
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineconn.core.executor.ExecutorManager$;
+import org.apache.linkis.engineconn.executor.entity.Executor;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class TimingMonitorService implements InitializingBean, Runnable {
+
+  private static Logger LOG = 
LoggerFactory.getLogger(TimingMonitorService.class);
+
+  private static CommonVars<TimeType> MONITOR_INTERVAL =
+      CommonVars.apply("linkis.engineconn.concurrent.monitor.interval", new 
TimeType("30s"));
+
+  @Autowired private List<MonitorService> monitorServiceList;
+
+  private boolean isAvailable = true;
+
+  private AccessibleExecutor concurrentExecutor = null;
+
+  private static final Object EXECUTOR_STATUS_LOCKER = new Object();
+
+  @Override
+  public void afterPropertiesSet() throws Exception {
+    if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM()) {
+      Utils.defaultScheduler()
+          .scheduleAtFixedRate(
+              this, 3 * 60 * 1000, MONITOR_INTERVAL.getValue().toLong(), 
TimeUnit.MILLISECONDS);
+    }
+  }
+
+  @Override
+  public void run() {
+
+    if (!EngineConnObject.isReady()) {
+      return;
+    }
+
+    try {
+      if (null == concurrentExecutor) {
+        Executor executor = 
ExecutorManager$.MODULE$.getInstance().getReportExecutor();
+        if (executor instanceof AccessibleExecutor) {
+          concurrentExecutor = (AccessibleExecutor) executor;
+        }
+      }
+      if (null == concurrentExecutor) {
+        LOG.warn("shell executor can not is null");
+        return;
+      }
+      isAvailable = true;
+      monitorServiceList.forEach(
+          monitorService -> {
+            if (!monitorService.isAvailable()) {
+              isAvailable = false;
+            }
+          });
+      if (isAvailable) {
+        if (concurrentExecutor.isBusy())
+          synchronized (EXECUTOR_STATUS_LOCKER) {
+            LOG.info("monitor turn to executor status from busy to unlock");
+            concurrentExecutor.transition(NodeStatus.Unlock);
+          }
+      } else {
+        if (concurrentExecutor.isIdle())
+          synchronized (EXECUTOR_STATUS_LOCKER) {
+            LOG.info("monitor turn to executor status from busy to unlock");
+            concurrentExecutor.transition(NodeStatus.Busy);
+          }
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to executor monitor ", e);
+    }
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
index 0a15acf6b..0699ae779 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
@@ -26,33 +26,6 @@ abstract class ConcurrentComputationExecutor(override val 
outputPrintLimit: Int
     extends ComputationExecutor(outputPrintLimit)
     with ConcurrentExecutor {
 
-  override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
-    if (isBusy) {
-      logger.error(
-        s"Executor is busy but still got new task ! Running task num : 
${getRunningTask}"
-      )
-    }
-    if (getRunningTask >= getConcurrentLimit) synchronized {
-      if (getRunningTask >= getConcurrentLimit && 
NodeStatus.isIdle(getStatus)) {
-        logger.info(
-          s"running task($getRunningTask) > concurrent limit 
$getConcurrentLimit, now to mark engine to busy "
-        )
-        transition(NodeStatus.Busy)
-      }
-    }
-    logger.info(s"engineConnTask(${engineConnTask.getTaskId}) running task is 
($getRunningTask) ")
-    val response = super.execute(engineConnTask)
-    if (getStatus == NodeStatus.Busy && getConcurrentLimit > getRunningTask) 
synchronized {
-      if (getStatus == NodeStatus.Busy && getConcurrentLimit > getRunningTask) 
{
-        logger.info(
-          s"running task($getRunningTask) < concurrent limit 
$getConcurrentLimit, now to mark engine to Unlock "
-        )
-        transition(NodeStatus.Unlock)
-      }
-    }
-    response
-  }
-
   protected override def ensureOp[A](f: => A): A = f
 
   override def afterExecute(
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
index 19dc900f3..0eb211f73 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
@@ -42,8 +42,8 @@ object AccessibleExecutorConfiguration {
   val ENGINECONN_LOCK_CHECK_INTERVAL =
     CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m"))
 
-  val ENGINECONN_SUPPORT_PARALLELISM =
-    CommonVars("wds.linkis.engineconn.support.parallelism", false)
+  val ENGINECONN_SUPPORT_PARALLELISM: Boolean =
+    CommonVars("wds.linkis.engineconn.support.parallelism", false).getValue
 
   val ENGINECONN_HEARTBEAT_TIME =
     CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m"))
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
index 475873c9d..53cdd44b0 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
@@ -43,7 +43,7 @@ class AccessibleExecutorSpringConfiguration extends Logging {
   def createLockManager(): LockService = {
 
     val lockService =
-      if 
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue) {
+      if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
         new EngineConnConcurrentLockService
       } else new EngineConnTimedLockService
     asyncListenerBusContext.addListener(lockService)
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
index 4d85fab48..452c6305b 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
@@ -51,7 +51,7 @@ class EngineConnTimedLockService extends LockService with 
Logging {
   private var lockType: EngineLockType = EngineLockType.Timed
 
   private def isSupportParallelism: Boolean =
-    AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue
+    AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM
 
   /**
    * @param lock
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
similarity index 52%
copy from 
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
copy to 
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
index 77db299ca..ba74dbbad 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
@@ -15,35 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.manager.engineplugin.shell.conf
 
-import oshi.SystemInfo
+import org.apache.linkis.common.conf.CommonVars
 
-object HardwareUtils {
+object ShellEngineConnConf {
 
-  def getAvailableMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    globalMemory.getAvailable
-  }
+  val SHELL_ENGINECONN_CONCURRENT_LIMIT: Int =
+    CommonVars[Int]("linkis.engineconn.shell.concurrent.limit", 30).getValue
 
-  def getMaxMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    globalMemory.getTotal
-  }
-
-  /**
-   * 1 total 2 available
-   * @return
-   */
-  def getTotalAndAvailableMemory(): (Long, Long) = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    (globalMemory.getTotal, globalMemory.getAvailable)
-  }
+  val LOG_SERVICE_MAX_THREAD_SIZE: Int =
+    CommonVars("linkis.engineconn.shell.log.max.thread.size", 50).getValue
 
 }
diff --git 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
index d814dd259..8277cb116 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
@@ -21,6 +21,7 @@ import org.apache.linkis.common.conf.CommonVars
 import org.apache.linkis.common.utils.{Logging, Utils}
 import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
 
+import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.StringUtils
 
 import java.io.BufferedReader
@@ -28,6 +29,7 @@ import java.util
 import java.util.concurrent.CountDownLatch
 
 class ReaderThread extends Thread with Logging {
+
   private var engineExecutionContext: EngineExecutionContext = _
   private var inputReader: BufferedReader = _
   private var extractor: YarnAppIdExtractor = _
@@ -35,6 +37,8 @@ class ReaderThread extends Thread with Logging {
   private val logListCount = 
CommonVars[Int]("wds.linkis.engineconn.log.list.count", 50)
   private var counter: CountDownLatch = _
 
+  private var isReaderAlive = true
+
   def this(
       engineExecutionContext: EngineExecutionContext,
       inputReader: BufferedReader,
@@ -51,23 +55,14 @@ class ReaderThread extends Thread with Logging {
   }
 
   def onDestroy(): Unit = {
-    Utils.tryCatch {
-      inputReader synchronized inputReader.close()
-    } { t =>
-      logger.warn("inputReader while closing the error stream", t)
-    }
+    isReaderAlive = false
   }
 
   def startReaderThread(): Unit = {
     Utils.tryCatch {
       this.start()
     } { t =>
-      if (t.isInstanceOf[OutOfMemoryError]) {
-        logger.warn(
-          "Caught " + t + ". One possible reason is that ulimit" + " setting 
of 'max user processes' is too low. If so, do" + " 'ulimit -u <largerNum>' and 
try again."
-        )
-      }
-      logger.warn("Cannot start thread to read from inputReader stream", t)
+      throw t
     }
   }
 
@@ -75,12 +70,11 @@ class ReaderThread extends Thread with Logging {
     Utils.tryCatch {
       var line: String = null
       val logArray: util.List[String] = new util.ArrayList[String]
-      while ({ line = inputReader.readLine(); line != null && !isInterrupted 
}) {
+      while ({ line = inputReader.readLine(); line != null && isReaderAlive }) 
{
         logger.info("read logger line :{}", line)
         logArray.add(line)
         extractor.appendLineToExtractor(line)
         if (isStdout) engineExecutionContext.appendTextResultSet(line)
-
         if (logArray.size > logListCount.getValue) {
           val linelist = StringUtils.join(logArray, "\n")
           engineExecutionContext.appendStdout(linelist)
@@ -95,6 +89,7 @@ class ReaderThread extends Thread with Logging {
     } { t =>
       logger.warn("inputReader reading the input stream", t)
     }
+    IOUtils.closeQuietly(inputReader)
     counter.countDown()
   }
 
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
similarity index 51%
rename from 
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
rename to 
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
index 77db299ca..d8d7edaa3 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
@@ -15,35 +15,6 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.manager.engineplugin.shell.executor
 
-import oshi.SystemInfo
-
-object HardwareUtils {
-
-  def getAvailableMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    globalMemory.getAvailable
-  }
-
-  def getMaxMemory(): Long = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    globalMemory.getTotal
-  }
-
-  /**
-   * 1 total 2 available
-   * @return
-   */
-  def getTotalAndAvailableMemory(): (Long, Long) = {
-    val systemInfo = new SystemInfo
-    val hardware = systemInfo.getHardware
-    val globalMemory = hardware.getMemory
-    (globalMemory.getTotal, globalMemory.getAvailable)
-  }
-
-}
+case class ShellECTaskInfo(taskId: String, process: Process, 
yarnAppIdExtractor: YarnAppIdExtractor)
diff --git 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
similarity index 80%
copy from 
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
copy to 
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
index 6dc5ac749..e127f7abb 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.engineplugin.shell.executor
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.engineconn.computation.executor.execute.{
   ComputationExecutor,
+  ConcurrentComputationExecutor,
   EngineExecutionContext
 }
 import org.apache.linkis.engineconn.core.EngineConnObject
@@ -31,6 +32,7 @@ import org.apache.linkis.manager.common.entity.resource.{
 }
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
 import 
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf
 import 
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -44,23 +46,30 @@ import org.apache.linkis.scheduler.executer.{
 import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.StringUtils
 
-import java.io.{BufferedReader, File, FileReader, InputStreamReader, 
IOException}
+import java.io.{BufferedReader, File, InputStreamReader}
 import java.util
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
 import java.util.concurrent.atomic.AtomicBoolean
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionContextExecutorService
 
-class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with 
Logging {
+class ShellEngineConnConcurrentExecutor(id: Int, maxRunningNumber: Int)
+    extends ConcurrentComputationExecutor
+    with Logging {
 
   private var engineExecutionContext: EngineExecutionContext = _
 
   private val executorLabels: util.List[Label[_]] = new 
util.ArrayList[Label[_]]()
 
-  private var process: Process = _
+  private val shellECTaskInfoCache: util.Map[String, ShellECTaskInfo] =
+    new ConcurrentHashMap[String, ShellECTaskInfo]()
 
-  private var extractor: YarnAppIdExtractor = _
+  private implicit val logAsyncService: ExecutionContextExecutorService =
+    Utils.newCachedExecutionContext(
+      ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE,
+      "ShelLogService-Thread-"
+    )
 
   override def init(): Unit = {
     logger.info(s"Ready to change engine state!")
@@ -87,6 +96,11 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
       logger.info("Shell executor reset new engineExecutionContext!")
     }
 
+    if (engineExecutionContext.getJobId.isEmpty) {
+      return ErrorExecuteResponse("taskID is null", null)
+    }
+
+    val taskId = engineExecutionContext.getJobId.get
     var bufferedReader: BufferedReader = null
     var errorsReader: BufferedReader = null
 
@@ -145,7 +159,8 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
               wdStr
             } else {
               logger.warn(
-                "User-specified working-directory: \'" + wdStr + "\' does not 
exist or user does not have access permission. Will execute shell task under 
default working-directory. Please contact BDP!"
+                "User-specified working-directory: \'" + wdStr + "\' does not 
exist or user does not have access permission. " +
+                  "Will execute shell task under default working-directory. 
Please contact the administrator!"
               )
               null
             }
@@ -160,7 +175,7 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
           null
         }
 
-      val generatedCode = if (argsArr == null || argsArr.length == 0) {
+      val generatedCode = if (argsArr == null || argsArr.isEmpty) {
         generateRunCode(code)
       } else {
         generateRunCodeWithArgs(code, argsArr)
@@ -172,20 +187,21 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
       }
 
       processBuilder.redirectErrorStream(false)
-      extractor = new YarnAppIdExtractor
-      extractor.startExtraction()
-      process = processBuilder.start()
-
+      val extractor = new YarnAppIdExtractor
+      val process = processBuilder.start()
       bufferedReader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
       errorsReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream))
+      // add task id and task Info cache
+      shellECTaskInfoCache.put(taskId, ShellECTaskInfo(taskId, process, 
extractor))
+
       val counter: CountDownLatch = new CountDownLatch(2)
       inputReaderThread =
         new ReaderThread(engineExecutionContext, bufferedReader, extractor, 
true, counter)
       errReaderThread =
         new ReaderThread(engineExecutionContext, errorsReader, extractor, 
false, counter)
 
-      inputReaderThread.start()
-      errReaderThread.start()
+      logAsyncService.execute(inputReaderThread)
+      logAsyncService.execute(errReaderThread)
 
       val exitCode = process.waitFor()
       counter.await()
@@ -195,28 +211,25 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
       if (exitCode != 0) {
         ErrorExecuteResponse("run shell failed", ShellCodeErrorException())
       } else SuccessExecuteResponse()
+
     } catch {
       case e: Exception =>
         logger.error("Execute shell code failed, reason:", e)
         ErrorExecuteResponse("run shell failed", e)
     } finally {
-      if (!completed.get()) {
-        Utils.tryAndWarn(errReaderThread.interrupt())
-        Utils.tryAndWarn(inputReaderThread.interrupt())
-      }
-      Utils.tryAndWarn {
-        extractor.onDestroy()
+      if (null != errorsReader) {
         inputReaderThread.onDestroy()
+      }
+      if (null != inputReaderThread) {
         errReaderThread.onDestroy()
       }
-      IOUtils.closeQuietly(bufferedReader)
-      IOUtils.closeQuietly(errorsReader)
+      shellECTaskInfoCache.remove(taskId)
     }
   }
 
   private def isExecutePathExist(executePath: String): Boolean = {
     val etlHomeDir: File = new File(executePath)
-    (etlHomeDir.exists() && etlHomeDir.isDirectory())
+    (etlHomeDir.exists() && etlHomeDir.isDirectory)
   }
 
   private def generateRunCode(code: String): Array[String] = {
@@ -293,18 +306,22 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
   }
 
   override def killTask(taskID: String): Unit = {
+    val shellECTaskInfo = shellECTaskInfoCache.remove(taskID)
+    if (null == shellECTaskInfo) {
+      return
+    }
     /*
       Kill sub-processes
      */
-    val pid = getPid(process)
+    val pid = getPid(shellECTaskInfo.process)
     GovernanceUtils.killProcess(String.valueOf(pid), s"kill task $taskID 
process", false)
     /*
       Kill yarn-applications
      */
-    val yarnAppIds = extractor.getExtractedYarnAppIds()
-    GovernanceUtils.killYarnJobApp(yarnAppIds.toList.asJava)
+    val yarnAppIds = 
shellECTaskInfo.yarnAppIdExtractor.getExtractedYarnAppIds()
+    GovernanceUtils.killYarnJobApp(yarnAppIds)
     logger.info(
-      s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app 
ids are ${yarnAppIds.mkString(",")}"
+      s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app 
ids are ${yarnAppIds}"
     )
     super.killTask(taskID)
 
@@ -323,15 +340,25 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
   }
 
   override def close(): Unit = {
-    try {
-      process.destroy()
-    } catch {
-      case e: Exception =>
-        logger.error(s"kill process ${process.toString} failed ", e)
-      case t: Throwable =>
-        logger.error(s"kill process ${process.toString} failed ", t)
+    Utils.tryCatch {
+      killAll()
+      logAsyncService.shutdown()
+    } { t: Throwable =>
+      logger.error(s"Shell ec failed to close ", t)
     }
     super.close()
   }
 
+  override def killAll(): Unit = {
+    val iterator = shellECTaskInfoCache.values().iterator()
+    while (iterator.hasNext) {
+      val shellECTaskInfo = iterator.next()
+      Utils.tryAndWarn(killTask(shellECTaskInfo.taskId))
+    }
+  }
+
+  override def getConcurrentLimit: Int = {
+    maxRunningNumber
+  }
+
 }
diff --git 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 6dc5ac749..88d385f82 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -145,7 +145,8 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
               wdStr
             } else {
               logger.warn(
-                "User-specified working-directory: \'" + wdStr + "\' does not 
exist or user does not have access permission. Will execute shell task under 
default working-directory. Please contact BDP!"
+                "User-specified working-directory: \'" + wdStr + "\' does not 
exist or user does not have access permission. " +
+                  "Will execute shell task under default working-directory. 
Please contact the administrator!"
               )
               null
             }
@@ -160,7 +161,7 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
           null
         }
 
-      val generatedCode = if (argsArr == null || argsArr.length == 0) {
+      val generatedCode = if (argsArr == null || argsArr.isEmpty) {
         generateRunCode(code)
       } else {
         generateRunCodeWithArgs(code, argsArr)
@@ -173,7 +174,6 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
 
       processBuilder.redirectErrorStream(false)
       extractor = new YarnAppIdExtractor
-      extractor.startExtraction()
       process = processBuilder.start()
 
       bufferedReader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
@@ -200,13 +200,10 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
         logger.error("Execute shell code failed, reason:", e)
         ErrorExecuteResponse("run shell failed", e)
     } finally {
-      if (!completed.get()) {
-        Utils.tryAndWarn(errReaderThread.interrupt())
-        Utils.tryAndWarn(inputReaderThread.interrupt())
-      }
-      Utils.tryAndWarn {
-        extractor.onDestroy()
+      if (null != errorsReader) {
         inputReaderThread.onDestroy()
+      }
+      if (null != inputReaderThread) {
         errReaderThread.onDestroy()
       }
       IOUtils.closeQuietly(bufferedReader)
@@ -302,10 +299,8 @@ class ShellEngineConnExecutor(id: Int) extends 
ComputationExecutor with Logging
       Kill yarn-applications
      */
     val yarnAppIds = extractor.getExtractedYarnAppIds()
-    GovernanceUtils.killYarnJobApp(yarnAppIds.toList.asJava)
-    logger.info(
-      s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app 
ids are ${yarnAppIds.mkString(",")}"
-    )
+    GovernanceUtils.killYarnJobApp(yarnAppIds)
+    logger.info(s"Finished kill yarn app ids in the engine of (${getId()}).}")
     super.killTask(taskID)
 
   }
diff --git 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
index 9cc5f29c0..2f5f79663 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
@@ -17,49 +17,29 @@
 
 package org.apache.linkis.manager.engineplugin.shell.executor
 
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.engineconn.common.conf.EngineConnConf
 
 import org.apache.commons.lang3.StringUtils
 
 import java.io._
 import java.util
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.Collections
 import java.util.regex.Pattern
 
-class YarnAppIdExtractor extends Thread with Logging {
-  val MAX_BUFFER: Int = 32 * 1024 * 1024 // 32MB
+class YarnAppIdExtractor extends Logging {
 
-  val buff: StringBuilder = new StringBuilder
+  private val appIdList: util.Set[String] = Collections.synchronizedSet(new 
util.HashSet[String]())
 
-  val appIdList: util.List[String] = new util.ArrayList[String]()
-
-  val shouldStop: AtomicBoolean = new AtomicBoolean(false)
+  private val regex = 
EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
+  private val pattern = Pattern.compile(regex)
 
   def appendLineToExtractor(content: String): Unit = {
-    buff.synchronized {
-      if (content.length + buff.length > MAX_BUFFER) {
-        logger.warn(
-          s"input exceed max-buffer-size, will abandon some part of input. 
maybe will lost some yarn-app-id."
-        )
-        buff
-          .append(StringUtils.substring(content, 0, MAX_BUFFER - buff.length))
-          .append(System.lineSeparator())
-      } else {
-        buff.append(content).append(System.lineSeparator())
-      }
-    }
-  }
-
-  def startExtraction(): Unit = {
-    this.start()
-  }
-
-  def onDestroy(): Unit = {
-    shouldStop.set(true)
-    this.interrupt()
-    buff.synchronized {
-      buff.setLength(0)
+    if (StringUtils.isBlank(content)) return
+    val yarnAppIDMatcher = pattern.matcher(content)
+    if (yarnAppIDMatcher.find) {
+      val yarnAppID = yarnAppIDMatcher.group(2)
+      appIdList.add(yarnAppID)
     }
   }
 
@@ -68,8 +48,6 @@ class YarnAppIdExtractor extends Thread with Logging {
     if (StringUtils.isBlank(content)) return new Array[String](0)
     // spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
     // sqoop, importtsv: Submitted application application_1609166102854_970911
-    val regex = 
EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
-    val pattern = Pattern.compile(regex)
 
     val stringReader = new StringReader(content)
 
@@ -94,48 +72,9 @@ class YarnAppIdExtractor extends Thread with Logging {
     ret.toArray(new Array[String](ret.size()))
   }
 
-  override def run(): Unit = {
-    while (!shouldStop.get()) {
-      var content: String = ""
-      buff.synchronized {
-        content = buff.toString()
-        buff.setLength(0)
-      }
-      if (StringUtils.isNotBlank(content)) {
-        val appIds = doExtractYarnAppId(content)
-        if (appIds != null && appIds.length != 0) {
-          logger.info(s"Retrieved new yarn application Id:" + 
appIds.mkString(" "))
-          addYarnAppIds(appIds)
-        }
-        logger.debug(s"Yarn-appid-extractor is running")
-      }
-      Utils.sleepQuietly(200L)
-    }
-  }
-
-  def addYarnAppId(yarnAppId: String): Unit = {
-    appIdList.synchronized {
-      appIdList.add(yarnAppId)
-    }
-  }
-
-  def addYarnAppIds(yarnAppIds: Array[String]): Unit = {
-    if (yarnAppIds != null && !yarnAppIds.isEmpty) {
-      appIdList.synchronized {
-        yarnAppIds.foreach(id =>
-          if (!appIdList.contains(id)) {
-            appIdList.add(id)
-            // input application id to logs/stderr
-            logger.info(s"Submitted application $id")
-          }
-        )
-      }
-    }
-  }
-
-  def getExtractedYarnAppIds(): Array[String] = {
+  def getExtractedYarnAppIds(): util.List[String] = {
     appIdList.synchronized {
-      appIdList.toArray(new Array[String](appIdList.size()))
+      new util.ArrayList[String](appIdList)
     }
   }
 
diff --git 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
index 05012bf62..cbfffc244 100644
--- 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
@@ -17,11 +17,16 @@
 
 package org.apache.linkis.manager.engineplugin.shell.factory
 
+import 
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
 import org.apache.linkis.engineconn.common.creation.EngineCreationContext
 import org.apache.linkis.engineconn.common.engineconn.EngineConn
 import 
org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
 import org.apache.linkis.engineconn.executor.entity.LabelExecutor
-import 
org.apache.linkis.manager.engineplugin.shell.executor.ShellEngineConnExecutor
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf
+import org.apache.linkis.manager.engineplugin.shell.executor.{
+  ShellEngineConnConcurrentExecutor,
+  ShellEngineConnExecutor
+}
 import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
 import org.apache.linkis.manager.label.entity.engine.RunType.RunType
@@ -32,8 +37,16 @@ class ShellEngineConnFactory extends 
ComputationSingleExecutorEngineConnFactory
       id: Int,
       engineCreationContext: EngineCreationContext,
       engineConn: EngineConn
-  ): LabelExecutor =
-    new ShellEngineConnExecutor(id)
+  ): LabelExecutor = {
+    if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+      new ShellEngineConnConcurrentExecutor(
+        id,
+        ShellEngineConnConf.SHELL_ENGINECONN_CONCURRENT_LIMIT
+      )
+    } else {
+      new ShellEngineConnExecutor(id)
+    }
+  }
 
   override protected def getEngineConnType: EngineType = EngineType.SHELL
 
diff --git 
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/QueryCacheServiceImpl.java
 
b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/QueryCacheServiceImpl.java
deleted file mode 100644
index 5e83e1f5b..000000000
--- 
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/QueryCacheServiceImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.jobhistory.cache.impl;
-/**
- * package org.apache.linkis.jobhistory.cache.impl;
- *
- * <p>import 
org.apache.linkis.governance.common.entity.task.RequestPersistTask; import
- * org.apache.linkis.jobhistory.cache.QueryCacheManager; import
- * org.apache.linkis.jobhistory.cache.QueryCacheService; import
- * org.apache.linkis.jobhistory.cache.domain.TaskResult; import
- * org.apache.linkis.protocol.constants.TaskConstant; import
- * org.apache.linkis.protocol.query.cache.RequestDeleteCache; import
- * org.apache.linkis.protocol.query.cache.RequestReadCache; import
- * org.apache.linkis.protocol.utils.TaskUtils; import 
org.apache.commons.collections.MapUtils;
- * import org.springframework.beans.factory.annotation.Autowired; import
- * org.springframework.stereotype.Component;
- *
- * <p>import java.util.Map; @Component public class QueryCacheServiceImpl 
implements
- * QueryCacheService { @Autowired QueryCacheManager queryCacheManager;
- *
- * <p>public Boolean needCache(RequestPersistTask requestPersistTask) { 
Map<String, Object>
- * runtimeMap = TaskUtils.getRuntimeMap(requestPersistTask.getParams()); if
- * (MapUtils.isEmpty(runtimeMap) || runtimeMap.get(TaskConstant.CACHE) == 
null) { return false; }
- * return (Boolean) runtimeMap.get(TaskConstant.CACHE); }
- *
- * <p>public void writeCache(RequestPersistTask requestPersistTask) { 
Map<String, Object> runtimeMap
- * = TaskUtils.getRuntimeMap(requestPersistTask.getParams()); Long 
cacheExpireAfter = ((Double)
- * runtimeMap.getOrDefault(TaskConstant.CACHE_EXPIRE_AFTER, 
300.0d)).longValue(); TaskResult
- * taskResult = new TaskResult( requestPersistTask.getExecutionCode(),
- * requestPersistTask.getExecuteApplicationName(), 
requestPersistTask.getUmUser(),
- * requestPersistTask.getResultLocation(), cacheExpireAfter );
- *
- * <p>UserTaskResultCache userTaskResultCache =
- * queryCacheManager.getCache(requestPersistTask.getUmUser(),
- * requestPersistTask.getExecuteApplicationName()); 
userTaskResultCache.put(taskResult); }
- *
- * <p>public TaskResult readCache(RequestReadCache requestReadCache) { 
UserTaskResultCache
- * userTaskResultCache = queryCacheManager.getCache(requestReadCache.getUser(),
- * requestReadCache.getEngineType()); return
- * userTaskResultCache.get(requestReadCache.getExecutionCode(),
- * requestReadCache.getReadCacheBefore()); }
- *
- * <p>public void deleteCache(RequestDeleteCache requestDeleteCache) { 
UserTaskResultCache
- * userTaskResultCache = 
queryCacheManager.getCache(requestDeleteCache.getUser(),
- * requestDeleteCache.getEngineType());
- * userTaskResultCache.remove(requestDeleteCache.getExecutionCode()); }
- *
- * <p>}
- */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to