This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 02a0d4633 [Feature][EngineConn][Shell] Shell EC supports concurrent
execution (#3903)
02a0d4633 is described below
commit 02a0d46334783f1be7ce4038b0091741446c7eb5
Author: peacewong <[email protected]>
AuthorDate: Wed Nov 30 22:22:27 2022 +0800
[Feature][EngineConn][Shell] Shell EC supports concurrent execution (#3903)
* shell ec Support concurrent mode
* Increase machine performance monitoring
---
linkis-commons/linkis-common/pom.xml | 7 ++
.../linkis/common/utils}/HardwareUtils.scala | 51 ++++++++--
.../linkis/common/utils/HardwareUtilsTest.scala | 49 ++++++++++
.../linkis-engineconn-manager-server/pom.xml | 6 --
.../service/impl/DefaultECMHealthService.scala | 5 +-
.../concurrent/monitor/HardwareMonitorService.java | 71 ++++++++++++++
.../concurrent/monitor/MonitorService.java} | 32 +-----
.../concurrent/monitor/TaskMonitorService.java | 67 +++++++++++++
.../concurrent/monitor/TimingMonitorService.java | 107 +++++++++++++++++++++
.../execute/ConcurrentComputationExecutor.scala | 27 ------
.../conf/AccessibleExecutorConfiguration.scala | 4 +-
.../AccessibleExecutorSpringConfiguration.scala | 2 +-
.../service/EngineConnTimedLockService.scala | 2 +-
.../shell/conf/ShellEngineConnConf.scala | 33 ++-----
.../engineplugin/shell/executor/ReaderThread.scala | 21 ++--
.../shell/executor/ShellECTaskInfo.scala | 33 +------
...ala => ShellEngineConnConcurrentExecutor.scala} | 95 +++++++++++-------
.../shell/executor/ShellEngineConnExecutor.scala | 21 ++--
.../shell/executor/YarnAppIdExtractor.scala | 87 +++--------------
.../shell/factory/ShellEngineConnFactory.scala | 19 +++-
.../cache/impl/QueryCacheServiceImpl.java | 64 ------------
21 files changed, 468 insertions(+), 335 deletions(-)
diff --git a/linkis-commons/linkis-common/pom.xml
b/linkis-commons/linkis-common/pom.xml
index ad8e53fe2..b71113c2a 100644
--- a/linkis-commons/linkis-common/pom.xml
+++ b/linkis-commons/linkis-common/pom.xml
@@ -146,6 +146,13 @@
<version>1.1.2</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.github.oshi</groupId>
+ <artifactId>oshi-core</artifactId>
+ <version>6.2.1</version>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
similarity index 52%
copy from
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
copy to
linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
index 77db299ca..f87a6f94c 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/HardwareUtils.scala
@@ -15,35 +15,72 @@
* limitations under the License.
*/
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.common.utils
+
+import java.math.RoundingMode
+import java.text.DecimalFormat
import oshi.SystemInfo
object HardwareUtils {
+ private val systemInfo = new SystemInfo
+
+ private val hardware = systemInfo.getHardware
+
+ private val THREE_DECIMAL = "0.000"
+
def getAvailableMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
val globalMemory = hardware.getMemory
globalMemory.getAvailable
}
def getMaxMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
val globalMemory = hardware.getMemory
globalMemory.getTotal
}
/**
* 1 total 2 available
+ *
* @return
*/
def getTotalAndAvailableMemory(): (Long, Long) = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
val globalMemory = hardware.getMemory
(globalMemory.getTotal, globalMemory.getAvailable)
}
+ /**
+ * Get memory usage percentage Keep 3 decimal
+ *
+ * @return
+ * percent
+ */
+ def memoryUsage(): Double = {
+ val memory = hardware.getMemory
+ val memoryUsage = (memory.getTotal - memory.getAvailable) * 1.0 /
memory.getTotal
+ val df = new DecimalFormat(THREE_DECIMAL)
+ df.setRoundingMode(RoundingMode.HALF_UP)
+ df.format(memoryUsage).toDouble
+ }
+
+ /**
+ * load average
+ *
+ * @return
+ * percent
+ */
+ def loadAverageUsage(): Double = {
+ val loadAverage = Utils.tryCatch {
+ OverloadUtils.getOSBean.getSystemLoadAverage
+ } { case e: Exception =>
+ val loadAverage = hardware.getProcessor.getSystemLoadAverage(1)(0)
+ if (loadAverage.isNaN) -1 else loadAverage
+ }
+
+ val df = new DecimalFormat(THREE_DECIMAL)
+ df.setRoundingMode(RoundingMode.HALF_UP)
+ if (loadAverage <= 0) 0 else df.format(loadAverage / 100d).toDouble
+ }
+
}
diff --git
a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
new file mode 100644
index 000000000..f6ba4de6f
--- /dev/null
+++
b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/HardwareUtilsTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.common.utils
+
+import org.junit.jupiter.api.Test
+
+class HardwareUtilsTest {
+
+ @Test private[utils] def testGetAvailableMemory() = {
+ val availableMemory = HardwareUtils.getAvailableMemory()
+ assert(availableMemory > 1000L)
+ }
+
+ @Test private[utils] def testGetMaxMemory() = {
+ val maxMemory = HardwareUtils.getMaxMemory()
+ assert(maxMemory > 1000L)
+ }
+
+ @Test private[utils] def testGetTotalAndAvailableMemory() = {
+ val (maxMemory, availableMemory) =
HardwareUtils.getTotalAndAvailableMemory()
+ assert(maxMemory >= availableMemory)
+ }
+
+ @Test private[utils] def testMemoryUsage() = {
+ val usage = HardwareUtils.memoryUsage()
+ assert(usage >= 0.000)
+ }
+
+ @Test private[utils] def testloadAverageUsage() = {
+ val usage = HardwareUtils.loadAverageUsage()
+ assert(usage >= 0.000)
+ }
+
+}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
index 3ac7a4935..07444de68 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml
@@ -80,12 +80,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>com.github.oshi</groupId>
- <artifactId>oshi-core</artifactId>
- <version>6.2.1</version>
- </dependency>
-
</dependencies>
<build>
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
index c7fde661d..f6c9c4cb4 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMHealthService.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.ecm.server.service.impl
-import org.apache.linkis.common.utils.{ByteTimeUtils, OverloadUtils, Utils}
+import org.apache.linkis.common.utils.{ByteTimeUtils, HardwareUtils,
OverloadUtils, Utils}
import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
import org.apache.linkis.ecm.core.report.ECMHealthReport
import org.apache.linkis.ecm.server.LinkisECMApplication
@@ -26,7 +26,6 @@ import org.apache.linkis.ecm.server.conf.ECMConfiguration._
import org.apache.linkis.ecm.server.listener.{ECMClosedEvent, ECMReadyEvent}
import org.apache.linkis.ecm.server.report.DefaultECMHealthReport
import org.apache.linkis.ecm.server.service.{ECMHealthService,
EngineConnListService}
-import org.apache.linkis.ecm.server.util.HardwareUtils
import org.apache.linkis.manager.common.entity.enumeration.{NodeHealthy,
NodeStatus}
import org.apache.linkis.manager.common.entity.metrics.{NodeHealthyInfo,
NodeOverLoadInfo}
import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource,
LoadInstanceResource}
@@ -54,8 +53,6 @@ class DefaultECMHealthService extends ECMHealthService with
ECMEventListener {
private val minResource =
new LoadInstanceResource(ECM_PROTECTED_MEMORY, ECM_PROTECTED_CORES,
ECM_PROTECTED_INSTANCES)
- private val runtime: Runtime = Runtime.getRuntime
-
private var status: NodeStatus = NodeStatus.Starting
private var healthy: NodeHealthy = NodeHealthy.Healthy
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/HardwareMonitorService.java
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/HardwareMonitorService.java
new file mode 100644
index 000000000..20e3946f2
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/HardwareMonitorService.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.utils.HardwareUtils;
+
+import org.springframework.stereotype.Component;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class HardwareMonitorService implements MonitorService {
+
+ private static Logger LOG =
LoggerFactory.getLogger(HardwareMonitorService.class);
+
+ private static CommonVars<Double> MEMORY_MAX_USAGE =
+ CommonVars.apply("linkis.engineconn.concurrent.max.memory.usage", 0.95);
+
+ private static CommonVars<Double> CPU_MAX_USAGE =
+ CommonVars.apply("linkis.engineconn.concurrent.max.cpu.usage", 0.99);
+
+ private Double lastLoadAverageUsage = 0D;
+
+ private Double lastMemoryUsage = 0D;
+
+ @Override
+ public boolean isAvailable() {
+
+ double memoryUsage = HardwareUtils.memoryUsage();
+
+ double loadAverageUsage = HardwareUtils.memoryUsage();
+
+ Double maxMemoryUsage = MEMORY_MAX_USAGE.getValue();
+ Double maxCpuUsage = CPU_MAX_USAGE.getValue();
+
+ boolean isUnavailable =
+ (memoryUsage > maxMemoryUsage && lastMemoryUsage > maxMemoryUsage)
+ || (loadAverageUsage > maxCpuUsage && lastLoadAverageUsage >
maxCpuUsage);
+ if (isUnavailable) {
+ LOG.warn(
+ "current load average {} is too high or memory usage {} is too high,
over max cpu load avg={} and memory usage {}",
+ loadAverageUsage,
+ memoryUsage,
+ maxCpuUsage,
+ maxMemoryUsage);
+ }
+
+ lastLoadAverageUsage = loadAverageUsage;
+
+ lastMemoryUsage = memoryUsage;
+
+ return !isUnavailable;
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/MonitorService.java
similarity index 51%
copy from
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
copy to
linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/MonitorService.java
index 77db299ca..ce39f3600 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/MonitorService.java
@@ -15,35 +15,9 @@
* limitations under the License.
*/
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
-import oshi.SystemInfo
-
-object HardwareUtils {
-
- def getAvailableMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- globalMemory.getAvailable
- }
-
- def getMaxMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- globalMemory.getTotal
- }
-
- /**
- * 1 total 2 available
- * @return
- */
- def getTotalAndAvailableMemory(): (Long, Long) = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- (globalMemory.getTotal, globalMemory.getAvailable)
- }
+public interface MonitorService {
+ boolean isAvailable();
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
new file mode 100644
index 000000000..591a9792f
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
+
+import
org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineconn.core.executor.ExecutorManager$;
+import org.apache.linkis.engineconn.executor.entity.Executor;
+
+import org.springframework.stereotype.Component;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class TaskMonitorService implements MonitorService {
+
+ private static Logger LOG =
LoggerFactory.getLogger(HardwareMonitorService.class);
+
+ private static ConcurrentComputationExecutor concurrentExecutor = null;
+
+ @Override
+ public boolean isAvailable() {
+
+ if (!EngineConnObject.isReady()) {
+ return true;
+ }
+
+ try {
+ if (null == concurrentExecutor) {
+ Executor executor =
ExecutorManager$.MODULE$.getInstance().getReportExecutor();
+ if (executor instanceof ConcurrentComputationExecutor) {
+ concurrentExecutor = (ConcurrentComputationExecutor) executor;
+ }
+ }
+ if (null == concurrentExecutor) {
+ LOG.warn("shell executor can not is null");
+ return true;
+ }
+ if (concurrentExecutor.getRunningTask() >
concurrentExecutor.getConcurrentLimit()) {
+ LOG.info(
+ "running task({}) > concurrent limit ({}) , now to mark ec to busy
",
+ concurrentExecutor.getRunningTask(),
+ concurrentExecutor.getConcurrentLimit());
+ return false;
+ }
+ } catch (Exception e) {
+ LOG.warn("Task Monitor failed", e);
+ }
+ return true;
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
new file mode 100644
index 000000000..de6bb440d
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.concurrent.monitor;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.conf.TimeType;
+import org.apache.linkis.common.utils.Utils;
+import
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration;
+import
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.engineconn.core.executor.ExecutorManager$;
+import org.apache.linkis.engineconn.executor.entity.Executor;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class TimingMonitorService implements InitializingBean, Runnable {
+
+ private static Logger LOG =
LoggerFactory.getLogger(TimingMonitorService.class);
+
+ private static CommonVars<TimeType> MONITOR_INTERVAL =
+ CommonVars.apply("linkis.engineconn.concurrent.monitor.interval", new
TimeType("30s"));
+
+ @Autowired private List<MonitorService> monitorServiceList;
+
+ private boolean isAvailable = true;
+
+ private AccessibleExecutor concurrentExecutor = null;
+
+ private static final Object EXECUTOR_STATUS_LOCKER = new Object();
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM()) {
+ Utils.defaultScheduler()
+ .scheduleAtFixedRate(
+ this, 3 * 60 * 1000, MONITOR_INTERVAL.getValue().toLong(),
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void run() {
+
+ if (!EngineConnObject.isReady()) {
+ return;
+ }
+
+ try {
+ if (null == concurrentExecutor) {
+ Executor executor =
ExecutorManager$.MODULE$.getInstance().getReportExecutor();
+ if (executor instanceof AccessibleExecutor) {
+ concurrentExecutor = (AccessibleExecutor) executor;
+ }
+ }
+ if (null == concurrentExecutor) {
+ LOG.warn("shell executor can not is null");
+ return;
+ }
+ isAvailable = true;
+ monitorServiceList.forEach(
+ monitorService -> {
+ if (!monitorService.isAvailable()) {
+ isAvailable = false;
+ }
+ });
+ if (isAvailable) {
+ if (concurrentExecutor.isBusy())
+ synchronized (EXECUTOR_STATUS_LOCKER) {
+ LOG.info("monitor turn to executor status from busy to unlock");
+ concurrentExecutor.transition(NodeStatus.Unlock);
+ }
+ } else {
+ if (concurrentExecutor.isIdle())
+ synchronized (EXECUTOR_STATUS_LOCKER) {
+ LOG.info("monitor turn to executor status from busy to unlock");
+ concurrentExecutor.transition(NodeStatus.Busy);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to executor monitor ", e);
+ }
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
index 0a15acf6b..0699ae779 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
@@ -26,33 +26,6 @@ abstract class ConcurrentComputationExecutor(override val
outputPrintLimit: Int
extends ComputationExecutor(outputPrintLimit)
with ConcurrentExecutor {
- override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
- if (isBusy) {
- logger.error(
- s"Executor is busy but still got new task ! Running task num :
${getRunningTask}"
- )
- }
- if (getRunningTask >= getConcurrentLimit) synchronized {
- if (getRunningTask >= getConcurrentLimit &&
NodeStatus.isIdle(getStatus)) {
- logger.info(
- s"running task($getRunningTask) > concurrent limit
$getConcurrentLimit, now to mark engine to busy "
- )
- transition(NodeStatus.Busy)
- }
- }
- logger.info(s"engineConnTask(${engineConnTask.getTaskId}) running task is
($getRunningTask) ")
- val response = super.execute(engineConnTask)
- if (getStatus == NodeStatus.Busy && getConcurrentLimit > getRunningTask)
synchronized {
- if (getStatus == NodeStatus.Busy && getConcurrentLimit > getRunningTask)
{
- logger.info(
- s"running task($getRunningTask) < concurrent limit
$getConcurrentLimit, now to mark engine to Unlock "
- )
- transition(NodeStatus.Unlock)
- }
- }
- response
- }
-
protected override def ensureOp[A](f: => A): A = f
override def afterExecute(
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
index 19dc900f3..0eb211f73 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
@@ -42,8 +42,8 @@ object AccessibleExecutorConfiguration {
val ENGINECONN_LOCK_CHECK_INTERVAL =
CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m"))
- val ENGINECONN_SUPPORT_PARALLELISM =
- CommonVars("wds.linkis.engineconn.support.parallelism", false)
+ val ENGINECONN_SUPPORT_PARALLELISM: Boolean =
+ CommonVars("wds.linkis.engineconn.support.parallelism", false).getValue
val ENGINECONN_HEARTBEAT_TIME =
CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m"))
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
index 475873c9d..53cdd44b0 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
@@ -43,7 +43,7 @@ class AccessibleExecutorSpringConfiguration extends Logging {
def createLockManager(): LockService = {
val lockService =
- if
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue) {
+ if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
new EngineConnConcurrentLockService
} else new EngineConnTimedLockService
asyncListenerBusContext.addListener(lockService)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
index 4d85fab48..452c6305b 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
@@ -51,7 +51,7 @@ class EngineConnTimedLockService extends LockService with
Logging {
private var lockType: EngineLockType = EngineLockType.Timed
private def isSupportParallelism: Boolean =
- AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue
+ AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM
/**
* @param lock
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
similarity index 52%
copy from
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
copy to
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
index 77db299ca..ba74dbbad 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
@@ -15,35 +15,16 @@
* limitations under the License.
*/
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.manager.engineplugin.shell.conf
-import oshi.SystemInfo
+import org.apache.linkis.common.conf.CommonVars
-object HardwareUtils {
+object ShellEngineConnConf {
- def getAvailableMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- globalMemory.getAvailable
- }
+ val SHELL_ENGINECONN_CONCURRENT_LIMIT: Int =
+ CommonVars[Int]("linkis.engineconn.shell.concurrent.limit", 30).getValue
- def getMaxMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- globalMemory.getTotal
- }
-
- /**
- * 1 total 2 available
- * @return
- */
- def getTotalAndAvailableMemory(): (Long, Long) = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- (globalMemory.getTotal, globalMemory.getAvailable)
- }
+ val LOG_SERVICE_MAX_THREAD_SIZE: Int =
+ CommonVars("linkis.engineconn.shell.log.max.thread.size", 50).getValue
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
index d814dd259..8277cb116 100644
---
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
@@ -21,6 +21,7 @@ import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.{Logging, Utils}
import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import java.io.BufferedReader
@@ -28,6 +29,7 @@ import java.util
import java.util.concurrent.CountDownLatch
class ReaderThread extends Thread with Logging {
+
private var engineExecutionContext: EngineExecutionContext = _
private var inputReader: BufferedReader = _
private var extractor: YarnAppIdExtractor = _
@@ -35,6 +37,8 @@ class ReaderThread extends Thread with Logging {
private val logListCount =
CommonVars[Int]("wds.linkis.engineconn.log.list.count", 50)
private var counter: CountDownLatch = _
+ private var isReaderAlive = true
+
def this(
engineExecutionContext: EngineExecutionContext,
inputReader: BufferedReader,
@@ -51,23 +55,14 @@ class ReaderThread extends Thread with Logging {
}
def onDestroy(): Unit = {
- Utils.tryCatch {
- inputReader synchronized inputReader.close()
- } { t =>
- logger.warn("inputReader while closing the error stream", t)
- }
+ isReaderAlive = false
}
def startReaderThread(): Unit = {
Utils.tryCatch {
this.start()
} { t =>
- if (t.isInstanceOf[OutOfMemoryError]) {
- logger.warn(
- "Caught " + t + ". One possible reason is that ulimit" + " setting
of 'max user processes' is too low. If so, do" + " 'ulimit -u <largerNum>' and
try again."
- )
- }
- logger.warn("Cannot start thread to read from inputReader stream", t)
+ throw t
}
}
@@ -75,12 +70,11 @@ class ReaderThread extends Thread with Logging {
Utils.tryCatch {
var line: String = null
val logArray: util.List[String] = new util.ArrayList[String]
- while ({ line = inputReader.readLine(); line != null && !isInterrupted
}) {
+ while ({ line = inputReader.readLine(); line != null && isReaderAlive })
{
logger.info("read logger line :{}", line)
logArray.add(line)
extractor.appendLineToExtractor(line)
if (isStdout) engineExecutionContext.appendTextResultSet(line)
-
if (logArray.size > logListCount.getValue) {
val linelist = StringUtils.join(logArray, "\n")
engineExecutionContext.appendStdout(linelist)
@@ -95,6 +89,7 @@ class ReaderThread extends Thread with Logging {
} { t =>
logger.warn("inputReader reading the input stream", t)
}
+ IOUtils.closeQuietly(inputReader)
counter.countDown()
}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
similarity index 51%
rename from
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
rename to
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
index 77db299ca..d8d7edaa3 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/HardwareUtils.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
@@ -15,35 +15,6 @@
* limitations under the License.
*/
-package org.apache.linkis.ecm.server.util
+package org.apache.linkis.manager.engineplugin.shell.executor
-import oshi.SystemInfo
-
-object HardwareUtils {
-
- def getAvailableMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- globalMemory.getAvailable
- }
-
- def getMaxMemory(): Long = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- globalMemory.getTotal
- }
-
- /**
- * 1 total 2 available
- * @return
- */
- def getTotalAndAvailableMemory(): (Long, Long) = {
- val systemInfo = new SystemInfo
- val hardware = systemInfo.getHardware
- val globalMemory = hardware.getMemory
- (globalMemory.getTotal, globalMemory.getAvailable)
- }
-
-}
+case class ShellECTaskInfo(taskId: String, process: Process,
yarnAppIdExtractor: YarnAppIdExtractor)
diff --git
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
similarity index 80%
copy from
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
copy to
linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
index 6dc5ac749..e127f7abb 100644
---
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.engineplugin.shell.executor
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.computation.executor.execute.{
ComputationExecutor,
+ ConcurrentComputationExecutor,
EngineExecutionContext
}
import org.apache.linkis.engineconn.core.EngineConnObject
@@ -31,6 +32,7 @@ import org.apache.linkis.manager.common.entity.resource.{
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import
org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf
import
org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -44,23 +46,30 @@ import org.apache.linkis.scheduler.executer.{
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
-import java.io.{BufferedReader, File, FileReader, InputStreamReader,
IOException}
+import java.io.{BufferedReader, File, InputStreamReader}
import java.util
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionContextExecutorService
-class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with
Logging {
+class ShellEngineConnConcurrentExecutor(id: Int, maxRunningNumber: Int)
+ extends ConcurrentComputationExecutor
+ with Logging {
private var engineExecutionContext: EngineExecutionContext = _
private val executorLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]]()
- private var process: Process = _
+ private val shellECTaskInfoCache: util.Map[String, ShellECTaskInfo] =
+ new ConcurrentHashMap[String, ShellECTaskInfo]()
- private var extractor: YarnAppIdExtractor = _
+ private implicit val logAsyncService: ExecutionContextExecutorService =
+ Utils.newCachedExecutionContext(
+ ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE,
+ "ShelLogService-Thread-"
+ )
override def init(): Unit = {
logger.info(s"Ready to change engine state!")
@@ -87,6 +96,11 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
logger.info("Shell executor reset new engineExecutionContext!")
}
+ if (engineExecutionContext.getJobId.isEmpty) {
+ return ErrorExecuteResponse("taskID is null", null)
+ }
+
+ val taskId = engineExecutionContext.getJobId.get
var bufferedReader: BufferedReader = null
var errorsReader: BufferedReader = null
@@ -145,7 +159,8 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
wdStr
} else {
logger.warn(
- "User-specified working-directory: \'" + wdStr + "\' does not
exist or user does not have access permission. Will execute shell task under
default working-directory. Please contact BDP!"
+ "User-specified working-directory: \'" + wdStr + "\' does not
exist or user does not have access permission. " +
+ "Will execute shell task under default working-directory.
Please contact the administrator!"
)
null
}
@@ -160,7 +175,7 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
null
}
- val generatedCode = if (argsArr == null || argsArr.length == 0) {
+ val generatedCode = if (argsArr == null || argsArr.isEmpty) {
generateRunCode(code)
} else {
generateRunCodeWithArgs(code, argsArr)
@@ -172,20 +187,21 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
}
processBuilder.redirectErrorStream(false)
- extractor = new YarnAppIdExtractor
- extractor.startExtraction()
- process = processBuilder.start()
-
+ val extractor = new YarnAppIdExtractor
+ val process = processBuilder.start()
bufferedReader = new BufferedReader(new
InputStreamReader(process.getInputStream))
errorsReader = new BufferedReader(new
InputStreamReader(process.getErrorStream))
+ // add task id and task Info cache
+ shellECTaskInfoCache.put(taskId, ShellECTaskInfo(taskId, process,
extractor))
+
val counter: CountDownLatch = new CountDownLatch(2)
inputReaderThread =
new ReaderThread(engineExecutionContext, bufferedReader, extractor,
true, counter)
errReaderThread =
new ReaderThread(engineExecutionContext, errorsReader, extractor,
false, counter)
- inputReaderThread.start()
- errReaderThread.start()
+ logAsyncService.execute(inputReaderThread)
+ logAsyncService.execute(errReaderThread)
val exitCode = process.waitFor()
counter.await()
@@ -195,28 +211,25 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
if (exitCode != 0) {
ErrorExecuteResponse("run shell failed", ShellCodeErrorException())
} else SuccessExecuteResponse()
+
} catch {
case e: Exception =>
logger.error("Execute shell code failed, reason:", e)
ErrorExecuteResponse("run shell failed", e)
} finally {
- if (!completed.get()) {
- Utils.tryAndWarn(errReaderThread.interrupt())
- Utils.tryAndWarn(inputReaderThread.interrupt())
- }
- Utils.tryAndWarn {
- extractor.onDestroy()
+ if (null != errorsReader) {
inputReaderThread.onDestroy()
+ }
+ if (null != inputReaderThread) {
errReaderThread.onDestroy()
}
- IOUtils.closeQuietly(bufferedReader)
- IOUtils.closeQuietly(errorsReader)
+ shellECTaskInfoCache.remove(taskId)
}
}
private def isExecutePathExist(executePath: String): Boolean = {
val etlHomeDir: File = new File(executePath)
- (etlHomeDir.exists() && etlHomeDir.isDirectory())
+ (etlHomeDir.exists() && etlHomeDir.isDirectory)
}
private def generateRunCode(code: String): Array[String] = {
@@ -293,18 +306,22 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
}
override def killTask(taskID: String): Unit = {
+ val shellECTaskInfo = shellECTaskInfoCache.remove(taskID)
+ if (null == shellECTaskInfo) {
+ return
+ }
/*
Kill sub-processes
*/
- val pid = getPid(process)
+ val pid = getPid(shellECTaskInfo.process)
GovernanceUtils.killProcess(String.valueOf(pid), s"kill task $taskID
process", false)
/*
Kill yarn-applications
*/
- val yarnAppIds = extractor.getExtractedYarnAppIds()
- GovernanceUtils.killYarnJobApp(yarnAppIds.toList.asJava)
+ val yarnAppIds =
shellECTaskInfo.yarnAppIdExtractor.getExtractedYarnAppIds()
+ GovernanceUtils.killYarnJobApp(yarnAppIds)
logger.info(
- s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app
ids are ${yarnAppIds.mkString(",")}"
+ s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app
ids are ${yarnAppIds}"
)
super.killTask(taskID)
@@ -323,15 +340,25 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
}
override def close(): Unit = {
- try {
- process.destroy()
- } catch {
- case e: Exception =>
- logger.error(s"kill process ${process.toString} failed ", e)
- case t: Throwable =>
- logger.error(s"kill process ${process.toString} failed ", t)
+ Utils.tryCatch {
+ killAll()
+ logAsyncService.shutdown()
+ } { t: Throwable =>
+ logger.error(s"Shell ec failed to close ", t)
}
super.close()
}
+ override def killAll(): Unit = {
+ val iterator = shellECTaskInfoCache.values().iterator()
+ while (iterator.hasNext) {
+ val shellECTaskInfo = iterator.next()
+ Utils.tryAndWarn(killTask(shellECTaskInfo.taskId))
+ }
+ }
+
+ override def getConcurrentLimit: Int = {
+ maxRunningNumber
+ }
+
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 6dc5ac749..88d385f82 100644
---
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -145,7 +145,8 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
wdStr
} else {
logger.warn(
- "User-specified working-directory: \'" + wdStr + "\' does not
exist or user does not have access permission. Will execute shell task under
default working-directory. Please contact BDP!"
+ "User-specified working-directory: \'" + wdStr + "\' does not
exist or user does not have access permission. " +
+ "Will execute shell task under default working-directory.
Please contact the administrator!"
)
null
}
@@ -160,7 +161,7 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
null
}
- val generatedCode = if (argsArr == null || argsArr.length == 0) {
+ val generatedCode = if (argsArr == null || argsArr.isEmpty) {
generateRunCode(code)
} else {
generateRunCodeWithArgs(code, argsArr)
@@ -173,7 +174,6 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
processBuilder.redirectErrorStream(false)
extractor = new YarnAppIdExtractor
- extractor.startExtraction()
process = processBuilder.start()
bufferedReader = new BufferedReader(new
InputStreamReader(process.getInputStream))
@@ -200,13 +200,10 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
logger.error("Execute shell code failed, reason:", e)
ErrorExecuteResponse("run shell failed", e)
} finally {
- if (!completed.get()) {
- Utils.tryAndWarn(errReaderThread.interrupt())
- Utils.tryAndWarn(inputReaderThread.interrupt())
- }
- Utils.tryAndWarn {
- extractor.onDestroy()
+ if (null != errorsReader) {
inputReaderThread.onDestroy()
+ }
+ if (null != inputReaderThread) {
errReaderThread.onDestroy()
}
IOUtils.closeQuietly(bufferedReader)
@@ -302,10 +299,8 @@ class ShellEngineConnExecutor(id: Int) extends
ComputationExecutor with Logging
Kill yarn-applications
*/
val yarnAppIds = extractor.getExtractedYarnAppIds()
- GovernanceUtils.killYarnJobApp(yarnAppIds.toList.asJava)
- logger.info(
- s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app
ids are ${yarnAppIds.mkString(",")}"
- )
+ GovernanceUtils.killYarnJobApp(yarnAppIds)
+ logger.info(s"Finished kill yarn app ids in the engine of (${getId()}).}")
super.killTask(taskID)
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
index 9cc5f29c0..2f5f79663 100644
---
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
@@ -17,49 +17,29 @@
package org.apache.linkis.manager.engineplugin.shell.executor
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.commons.lang3.StringUtils
import java.io._
import java.util
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.Collections
import java.util.regex.Pattern
-class YarnAppIdExtractor extends Thread with Logging {
- val MAX_BUFFER: Int = 32 * 1024 * 1024 // 32MB
+class YarnAppIdExtractor extends Logging {
- val buff: StringBuilder = new StringBuilder
+ private val appIdList: util.Set[String] = Collections.synchronizedSet(new
util.HashSet[String]())
- val appIdList: util.List[String] = new util.ArrayList[String]()
-
- val shouldStop: AtomicBoolean = new AtomicBoolean(false)
+ private val regex =
EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
+ private val pattern = Pattern.compile(regex)
def appendLineToExtractor(content: String): Unit = {
- buff.synchronized {
- if (content.length + buff.length > MAX_BUFFER) {
- logger.warn(
- s"input exceed max-buffer-size, will abandon some part of input.
maybe will lost some yarn-app-id."
- )
- buff
- .append(StringUtils.substring(content, 0, MAX_BUFFER - buff.length))
- .append(System.lineSeparator())
- } else {
- buff.append(content).append(System.lineSeparator())
- }
- }
- }
-
- def startExtraction(): Unit = {
- this.start()
- }
-
- def onDestroy(): Unit = {
- shouldStop.set(true)
- this.interrupt()
- buff.synchronized {
- buff.setLength(0)
+ if (StringUtils.isBlank(content)) return
+ val yarnAppIDMatcher = pattern.matcher(content)
+ if (yarnAppIDMatcher.find) {
+ val yarnAppID = yarnAppIDMatcher.group(2)
+ appIdList.add(yarnAppID)
}
}
@@ -68,8 +48,6 @@ class YarnAppIdExtractor extends Thread with Logging {
if (StringUtils.isBlank(content)) return new Array[String](0)
// spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
// sqoop, importtsv: Submitted application application_1609166102854_970911
- val regex =
EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
- val pattern = Pattern.compile(regex)
val stringReader = new StringReader(content)
@@ -94,48 +72,9 @@ class YarnAppIdExtractor extends Thread with Logging {
ret.toArray(new Array[String](ret.size()))
}
- override def run(): Unit = {
- while (!shouldStop.get()) {
- var content: String = ""
- buff.synchronized {
- content = buff.toString()
- buff.setLength(0)
- }
- if (StringUtils.isNotBlank(content)) {
- val appIds = doExtractYarnAppId(content)
- if (appIds != null && appIds.length != 0) {
- logger.info(s"Retrieved new yarn application Id:" +
appIds.mkString(" "))
- addYarnAppIds(appIds)
- }
- logger.debug(s"Yarn-appid-extractor is running")
- }
- Utils.sleepQuietly(200L)
- }
- }
-
- def addYarnAppId(yarnAppId: String): Unit = {
- appIdList.synchronized {
- appIdList.add(yarnAppId)
- }
- }
-
- def addYarnAppIds(yarnAppIds: Array[String]): Unit = {
- if (yarnAppIds != null && !yarnAppIds.isEmpty) {
- appIdList.synchronized {
- yarnAppIds.foreach(id =>
- if (!appIdList.contains(id)) {
- appIdList.add(id)
- // input application id to logs/stderr
- logger.info(s"Submitted application $id")
- }
- )
- }
- }
- }
-
- def getExtractedYarnAppIds(): Array[String] = {
+ def getExtractedYarnAppIds(): util.List[String] = {
appIdList.synchronized {
- appIdList.toArray(new Array[String](appIdList.size()))
+ new util.ArrayList[String](appIdList)
}
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
index 05012bf62..cbfffc244 100644
---
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
@@ -17,11 +17,16 @@
package org.apache.linkis.manager.engineplugin.shell.factory
+import
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import
org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
import org.apache.linkis.engineconn.executor.entity.LabelExecutor
-import
org.apache.linkis.manager.engineplugin.shell.executor.ShellEngineConnExecutor
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf
+import org.apache.linkis.manager.engineplugin.shell.executor.{
+ ShellEngineConnConcurrentExecutor,
+ ShellEngineConnExecutor
+}
import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
@@ -32,8 +37,16 @@ class ShellEngineConnFactory extends
ComputationSingleExecutorEngineConnFactory
id: Int,
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
- ): LabelExecutor =
- new ShellEngineConnExecutor(id)
+ ): LabelExecutor = {
+ if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+ new ShellEngineConnConcurrentExecutor(
+ id,
+ ShellEngineConnConf.SHELL_ENGINECONN_CONCURRENT_LIMIT
+ )
+ } else {
+ new ShellEngineConnExecutor(id)
+ }
+ }
override protected def getEngineConnType: EngineType = EngineType.SHELL
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/QueryCacheServiceImpl.java
b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/QueryCacheServiceImpl.java
deleted file mode 100644
index 5e83e1f5b..000000000
---
a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/cache/impl/QueryCacheServiceImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.jobhistory.cache.impl;
-/**
- * package org.apache.linkis.jobhistory.cache.impl;
- *
- * <p>import
org.apache.linkis.governance.common.entity.task.RequestPersistTask; import
- * org.apache.linkis.jobhistory.cache.QueryCacheManager; import
- * org.apache.linkis.jobhistory.cache.QueryCacheService; import
- * org.apache.linkis.jobhistory.cache.domain.TaskResult; import
- * org.apache.linkis.protocol.constants.TaskConstant; import
- * org.apache.linkis.protocol.query.cache.RequestDeleteCache; import
- * org.apache.linkis.protocol.query.cache.RequestReadCache; import
- * org.apache.linkis.protocol.utils.TaskUtils; import
org.apache.commons.collections.MapUtils;
- * import org.springframework.beans.factory.annotation.Autowired; import
- * org.springframework.stereotype.Component;
- *
- * <p>import java.util.Map; @Component public class QueryCacheServiceImpl
implements
- * QueryCacheService { @Autowired QueryCacheManager queryCacheManager;
- *
- * <p>public Boolean needCache(RequestPersistTask requestPersistTask) {
Map<String, Object>
- * runtimeMap = TaskUtils.getRuntimeMap(requestPersistTask.getParams()); if
- * (MapUtils.isEmpty(runtimeMap) || runtimeMap.get(TaskConstant.CACHE) ==
null) { return false; }
- * return (Boolean) runtimeMap.get(TaskConstant.CACHE); }
- *
- * <p>public void writeCache(RequestPersistTask requestPersistTask) {
Map<String, Object> runtimeMap
- * = TaskUtils.getRuntimeMap(requestPersistTask.getParams()); Long
cacheExpireAfter = ((Double)
- * runtimeMap.getOrDefault(TaskConstant.CACHE_EXPIRE_AFTER,
300.0d)).longValue(); TaskResult
- * taskResult = new TaskResult( requestPersistTask.getExecutionCode(),
- * requestPersistTask.getExecuteApplicationName(),
requestPersistTask.getUmUser(),
- * requestPersistTask.getResultLocation(), cacheExpireAfter );
- *
- * <p>UserTaskResultCache userTaskResultCache =
- * queryCacheManager.getCache(requestPersistTask.getUmUser(),
- * requestPersistTask.getExecuteApplicationName());
userTaskResultCache.put(taskResult); }
- *
- * <p>public TaskResult readCache(RequestReadCache requestReadCache) {
UserTaskResultCache
- * userTaskResultCache = queryCacheManager.getCache(requestReadCache.getUser(),
- * requestReadCache.getEngineType()); return
- * userTaskResultCache.get(requestReadCache.getExecutionCode(),
- * requestReadCache.getReadCacheBefore()); }
- *
- * <p>public void deleteCache(RequestDeleteCache requestDeleteCache) {
UserTaskResultCache
- * userTaskResultCache =
queryCacheManager.getCache(requestDeleteCache.getUser(),
- * requestDeleteCache.getEngineType());
- * userTaskResultCache.remove(requestDeleteCache.getExecutionCode()); }
- *
- * <p>}
- */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]