This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 6a47709bb [linkis-engineconn-plugin-flink] Modification of scala file
floating red (#3216)
6a47709bb is described below
commit 6a47709bba67bfb7e08d8b5b046796a2dda37083
Author: 成彬彬 <[email protected]>
AuthorDate: Sat Sep 3 13:18:38 2022 +0800
[linkis-engineconn-plugin-flink] Modification of scala file floating red
(#3216)
---
.../resource/FlinkEngineConnResourceFactory.scala | 3 ++-
.../manager/engineplugin/io/IoEngineConnPlugin.scala | 9 +--------
.../manager/engineplugin/io/domain/FSInfo.scala | 2 +-
.../io/executor/IoEngineConnExecutor.scala | 20 +++++++++++++-------
.../manager/engineplugin/io/utils/IOHelp.scala | 6 ++++--
5 files changed, 21 insertions(+), 19 deletions(-)
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
index 0bc4e8b51..1ea682f54 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
@@ -37,11 +37,12 @@ class FlinkEngineConnResourceFactory extends
AbstractEngineResourceFactory {
String.valueOf(containers *
LINKIS_FLINK_TASK_SLOTS.getValue(properties))
)
containers
- } else
+ } else {
math.round(
FLINK_APP_DEFAULT_PARALLELISM.getValue(properties) * 1.0f /
LINKIS_FLINK_TASK_SLOTS
.getValue(properties)
)
+ }
val yarnMemory = ByteTimeUtils.byteStringAsBytes(
LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(properties) * containers + "M"
) +
diff --git
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
index 56354293b..60f0f1fb9 100644
---
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
+++
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
@@ -46,14 +46,7 @@ class IoEngineConnPlugin extends EngineConnPlugin {
private val defaultLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]]()
- override def init(params: util.Map[String, Any]): Unit = {
- /*val engineTypeLabel = new EngineTypeLabel()
- engineTypeLabel.setEngineType(EngineType.IO_ENGINE.toString)
-
engineTypeLabel.setVersion(IOEngineConnConfiguration.DEFAULT_VERSION.getValue)
- this.defaultLabels.add(engineTypeLabel)
- val runTypeLabel = new EngineRunTypeLabel()
- runTypeLabel.setRunType(RunType.IO.toString)*/
- }
+ override def init(params: util.Map[String, Any]): Unit = {}
override def getEngineResourceFactory: EngineResourceFactory = {
if (null == engineResourceFactory) resourceLocker synchronized {
diff --git
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
index 674a46d06..192d4ddbb 100644
---
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
+++
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
@@ -25,7 +25,7 @@ import org.apache.linkis.storage.utils.StorageConfiguration
*/
class FSInfo(val id: Long, val fs: Fs, var lastAccessTime: Long =
System.currentTimeMillis()) {
- def timeout = System
+ def timeout: Boolean = System
.currentTimeMillis() - lastAccessTime >
(StorageConfiguration.IO_FS_EXPIRE_TIME.getValue + 60 * 1000)
}
diff --git
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
index 1ae589b38..874fc1630 100644
---
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
@@ -84,7 +84,7 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int
= 10)
}
/*
- * 定时清理空闲的FS
+ * Regularly clean up idle FS
*/
private val cleanupThread = new Thread("IOEngineExecutor-Cleanup-Scanner") {
setDaemon(true)
@@ -136,8 +136,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit:
Int = 10)
AliasOutputExecuteResponse(method.id.toString, IOHelp.read(fs, method))
case "available" =>
val fs = getUserFS(method)
- if (method.params == null || method.params.length != 2)
+ if (method.params == null || method.params.length != 2) {
throw new StorageErrorException(53003, "Unsupported parameter calls")
+ }
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
classOf[FsPath]
@@ -157,8 +158,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit:
Int = 10)
SuccessExecuteResponse()
case "renameTo" =>
val fs = getUserFS(method)
- if (method.params == null || method.params.length != 2)
+ if (method.params == null || method.params.length != 2) {
throw new StorageErrorException(53003, "Unsupported parameter calls")
+ }
fs.renameTo(
MethodEntitySerializer
.deserializerToJavaObject(method.params(0).asInstanceOf[String],
classOf[FsPath]),
@@ -169,8 +171,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit:
Int = 10)
)
SuccessExecuteResponse()
case "list" =>
- if (method.params == null || method.params.length != 1)
+ if (method.params == null || method.params.length != 1) {
throw new StorageErrorException(53003, "Unsupported parameter calls")
+ }
val fs = getUserFS(method)
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
@@ -183,8 +186,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit:
Int = 10)
)
)
case "listPathWithError" =>
- if (method.params == null || method.params.length != 1)
+ if (method.params == null || method.params.length != 1) {
throw new StorageErrorException(53003, "Unsupported parameter calls")
+ }
val fs = getUserFS(method).asInstanceOf[FileSystem]
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
@@ -355,16 +359,18 @@ class IoEngineConnExecutor(val id: Int, val outputLimit:
Int = 10)
val realMethod = fs.getClass.getMethods
.filter(_.getName == methodName)
.find(_.getGenericParameterTypes.length == parameterSize)
- if (realMethod.isEmpty)
+ if (realMethod.isEmpty) {
throw new StorageErrorException(
53003,
s"not exists method $methodName in fs ${fs.getClass.getSimpleName}."
)
- if (parameterSize > 0)
+ }
+ if (parameterSize > 0) {
method.params(0) = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
methodParamType
)
+ }
val res = MethodEntitySerializer.serializerJavaObject(
ReflectionUtils.invoke(fs, realMethod.get, method.params)
)
diff --git
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
index e42643c6f..b4dcc64f5 100644
---
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
+++
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
@@ -38,11 +38,12 @@ object IOHelp {
* @return
*/
def read(fs: Fs, method: MethodEntity): String = {
- if (method.params == null || method.params.isEmpty)
+ if (method.params == null || method.params.isEmpty) {
throw new StorageErrorException(
53002,
"The read method parameter cannot be empty(read方法参数不能为空)"
)
+ }
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
classOf[FsPath]
@@ -82,8 +83,9 @@ object IOHelp {
* @param method
*/
def write(fs: Fs, method: MethodEntity): Unit = {
- if (method.params == null || method.params.isEmpty)
+ if (method.params == null || method.params.isEmpty) {
throw new StorageErrorException(53003, "Unsupported parameter
call(不支持的参数调用)")
+ }
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
classOf[FsPath]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]