This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 6a47709bb [linkis-engineconn-plugin-flink] Modification of scala file 
floating red (#3216)
6a47709bb is described below

commit 6a47709bba67bfb7e08d8b5b046796a2dda37083
Author: 成彬彬 <[email protected]>
AuthorDate: Sat Sep 3 13:18:38 2022 +0800

    [linkis-engineconn-plugin-flink] Modification of scala file floating red 
(#3216)
---
 .../resource/FlinkEngineConnResourceFactory.scala    |  3 ++-
 .../manager/engineplugin/io/IoEngineConnPlugin.scala |  9 +--------
 .../manager/engineplugin/io/domain/FSInfo.scala      |  2 +-
 .../io/executor/IoEngineConnExecutor.scala           | 20 +++++++++++++-------
 .../manager/engineplugin/io/utils/IOHelp.scala       |  6 ++++--
 5 files changed, 21 insertions(+), 19 deletions(-)

diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
index 0bc4e8b51..1ea682f54 100644
--- 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
@@ -37,11 +37,12 @@ class FlinkEngineConnResourceFactory extends 
AbstractEngineResourceFactory {
         String.valueOf(containers * 
LINKIS_FLINK_TASK_SLOTS.getValue(properties))
       )
       containers
-    } else
+    } else {
       math.round(
         FLINK_APP_DEFAULT_PARALLELISM.getValue(properties) * 1.0f / 
LINKIS_FLINK_TASK_SLOTS
           .getValue(properties)
       )
+    }
     val yarnMemory = ByteTimeUtils.byteStringAsBytes(
       LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(properties) * containers + "M"
     ) +
diff --git 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
index 56354293b..60f0f1fb9 100644
--- 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
+++ 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/IoEngineConnPlugin.scala
@@ -46,14 +46,7 @@ class IoEngineConnPlugin extends EngineConnPlugin {
 
   private val defaultLabels: util.List[Label[_]] = new 
util.ArrayList[Label[_]]()
 
-  override def init(params: util.Map[String, Any]): Unit = {
-    /*val engineTypeLabel = new EngineTypeLabel()
-    engineTypeLabel.setEngineType(EngineType.IO_ENGINE.toString)
-    
engineTypeLabel.setVersion(IOEngineConnConfiguration.DEFAULT_VERSION.getValue)
-    this.defaultLabels.add(engineTypeLabel)
-    val runTypeLabel = new EngineRunTypeLabel()
-    runTypeLabel.setRunType(RunType.IO.toString)*/
-  }
+  override def init(params: util.Map[String, Any]): Unit = {}
 
   override def getEngineResourceFactory: EngineResourceFactory = {
     if (null == engineResourceFactory) resourceLocker synchronized {
diff --git 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
index 674a46d06..192d4ddbb 100644
--- 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
+++ 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/domain/FSInfo.scala
@@ -25,7 +25,7 @@ import org.apache.linkis.storage.utils.StorageConfiguration
  */
 class FSInfo(val id: Long, val fs: Fs, var lastAccessTime: Long = 
System.currentTimeMillis()) {
 
-  def timeout = System
+  def timeout: Boolean = System
     .currentTimeMillis() - lastAccessTime > 
(StorageConfiguration.IO_FS_EXPIRE_TIME.getValue + 60 * 1000)
 
 }
diff --git 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
index 1ae589b38..874fc1630 100644
--- 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala
@@ -84,7 +84,7 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int 
= 10)
   }
 
   /*
-   * 定时清理空闲的FS
+   * Regularly clean up idle FS
    */
   private val cleanupThread = new Thread("IOEngineExecutor-Cleanup-Scanner") {
     setDaemon(true)
@@ -136,8 +136,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
         AliasOutputExecuteResponse(method.id.toString, IOHelp.read(fs, method))
       case "available" =>
         val fs = getUserFS(method)
-        if (method.params == null || method.params.length != 2)
+        if (method.params == null || method.params.length != 2) {
           throw new StorageErrorException(53003, "Unsupported parameter calls")
+        }
         val dest = MethodEntitySerializer.deserializerToJavaObject(
           method.params(0).asInstanceOf[String],
           classOf[FsPath]
@@ -157,8 +158,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
         SuccessExecuteResponse()
       case "renameTo" =>
         val fs = getUserFS(method)
-        if (method.params == null || method.params.length != 2)
+        if (method.params == null || method.params.length != 2) {
           throw new StorageErrorException(53003, "Unsupported parameter calls")
+        }
         fs.renameTo(
           MethodEntitySerializer
             .deserializerToJavaObject(method.params(0).asInstanceOf[String], 
classOf[FsPath]),
@@ -169,8 +171,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
         )
         SuccessExecuteResponse()
       case "list" =>
-        if (method.params == null || method.params.length != 1)
+        if (method.params == null || method.params.length != 1) {
           throw new StorageErrorException(53003, "Unsupported parameter calls")
+        }
         val fs = getUserFS(method)
         val dest = MethodEntitySerializer.deserializerToJavaObject(
           method.params(0).asInstanceOf[String],
@@ -183,8 +186,9 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
           )
         )
       case "listPathWithError" =>
-        if (method.params == null || method.params.length != 1)
+        if (method.params == null || method.params.length != 1) {
           throw new StorageErrorException(53003, "Unsupported parameter calls")
+        }
         val fs = getUserFS(method).asInstanceOf[FileSystem]
         val dest = MethodEntitySerializer.deserializerToJavaObject(
           method.params(0).asInstanceOf[String],
@@ -355,16 +359,18 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: 
Int = 10)
     val realMethod = fs.getClass.getMethods
       .filter(_.getName == methodName)
       .find(_.getGenericParameterTypes.length == parameterSize)
-    if (realMethod.isEmpty)
+    if (realMethod.isEmpty) {
       throw new StorageErrorException(
         53003,
         s"not exists method $methodName in fs ${fs.getClass.getSimpleName}."
       )
-    if (parameterSize > 0)
+    }
+    if (parameterSize > 0) {
       method.params(0) = MethodEntitySerializer.deserializerToJavaObject(
         method.params(0).asInstanceOf[String],
         methodParamType
       )
+    }
     val res = MethodEntitySerializer.serializerJavaObject(
       ReflectionUtils.invoke(fs, realMethod.get, method.params)
     )
diff --git 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
index e42643c6f..b4dcc64f5 100644
--- 
a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
+++ 
b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala
@@ -38,11 +38,12 @@ object IOHelp {
    * @return
    */
   def read(fs: Fs, method: MethodEntity): String = {
-    if (method.params == null || method.params.isEmpty)
+    if (method.params == null || method.params.isEmpty) {
       throw new StorageErrorException(
         53002,
         "The read method parameter cannot be empty(read方法参数不能为空)"
       )
+    }
     val dest = MethodEntitySerializer.deserializerToJavaObject(
       method.params(0).asInstanceOf[String],
       classOf[FsPath]
@@ -82,8 +83,9 @@ object IOHelp {
    * @param method
    */
   def write(fs: Fs, method: MethodEntity): Unit = {
-    if (method.params == null || method.params.isEmpty)
+    if (method.params == null || method.params.isEmpty) {
       throw new StorageErrorException(53003, "Unsupported parameter 
call(不支持的参数调用)")
+    }
     val dest = MethodEntitySerializer.deserializerToJavaObject(
       method.params(0).asInstanceOf[String],
       classOf[FsPath]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to