This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new f6ee38fe2 Trino EC Support BackQuoted (#3345)
f6ee38fe2 is described below

commit f6ee38fe22485e81ad3e8de03c55bd059a4f64dd
Author: peacewong <[email protected]>
AuthorDate: Wed Sep 14 10:37:34 2022 +0800

    Trino EC Support BackQuoted (#3345)
    
    * optimize trino support BackQuoted
    
    * add statementClientCache clear
---
 .../TrinoProcessEngineConnLaunchBuilder.scala      |  8 +++++--
 .../trino/conf/TrinoConfiguration.scala            | 27 +++++++++++-----------
 .../trino/executor/TrinoEngineConnExecutor.scala   | 26 +++++++++++++--------
 .../TrinoSQLHook.scala}                            | 21 +++++++++--------
 4 files changed, 48 insertions(+), 34 deletions(-)

diff --git 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
index 3b57d41ec..a79d180f8 100644
--- 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
@@ -26,8 +26,12 @@ import org.apache.commons.lang3.StringUtils
 class TrinoProcessEngineConnLaunchBuilder extends 
JavaProcessEngineConnLaunchBuilder {
 
   override def getEngineStartUser(label: UserCreatorLabel): String = {
-    /* using user label if user is blank */
-    StringUtils.defaultIfBlank(TrinoConfiguration.TRINO_USER.getValue, 
label.getUser)
+    if (TrinoConfiguration.TRINO_USER_ISOLATION_MODE.getValue) {
+      /* using user label if user is blank */
+      label.getUser
+    } else {
+      TrinoConfiguration.TRINO_DEFAULT_USER.getValue
+    }
   }
 
 }
diff --git 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
index 043ed1f1e..42ac6e4cb 100644
--- 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
+++ 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
@@ -18,6 +18,7 @@
 package org.apache.linkis.engineplugin.trino.conf
 
 import org.apache.linkis.common.conf.{ByteType, CommonVars}
+import org.apache.linkis.storage.utils.StorageConfiguration
 
 import java.lang
 
@@ -25,31 +26,23 @@ object TrinoConfiguration {
 
   val ENGINE_CONCURRENT_LIMIT = 
CommonVars[Int]("wds.linkis.engineconn.concurrent.limit", 100)
 
-  val ENTRANCE_MAX_JOB_INSTANCE = 
CommonVars[Int]("wds.linkis.entrance.max.job.instance", 100)
-
-  val ENTRANCE_PROTECTED_JOB_INSTANCE =
-    CommonVars[Int]("wds.linkis.entrance.protected.job.instance", 0)
-
-  val ENTRANCE_RESULTS_MAX_CACHE =
-    CommonVars("wds.linkis.trino.resultSet.cache.max", new ByteType("512k"))
-
   val DEFAULT_LIMIT = CommonVars[Int]("wds.linkis.trino.default.limit", 5000)
 
   val TRINO_HTTP_CONNECT_TIME_OUT =
-    CommonVars[java.lang.Long]("wds.linkis.trino.http.connectTimeout", new 
lang.Long(60)) // seconds
+    CommonVars[java.lang.Long]("wds.linkis.trino.http.connectTimeout.seconds", 
new lang.Long(60))
 
   val TRINO_HTTP_READ_TIME_OUT =
-    CommonVars[java.lang.Long]("wds.linkis.trino.http.readTimeout", new 
lang.Long(60))
+    CommonVars[java.lang.Long]("wds.linkis.trino.http.readTimeout.seconds", 
new lang.Long(60))
 
   val TRINO_URL = CommonVars[String]("wds.linkis.trino.url", 
"http://127.0.0.1:8080";)
-  val TRINO_USER = CommonVars[String]("wds.linkis.trino.user", null)
+
   val TRINO_PASSWORD = CommonVars[String]("wds.linkis.trino.password", null)
-  val TRINO_PASSWORD_CMD = CommonVars[String]("wds.linkis.trino.passwordCmd", 
null)
+  val TRINO_PASSWORD_CMD = CommonVars[String]("wds.linkis.trino.password.cmd", 
null)
   val TRINO_CATALOG = CommonVars[String]("wds.linkis.trino.catalog", "system")
   val TRINO_SCHEMA = CommonVars[String]("wds.linkis.trino.schema", "")
   val TRINO_SOURCE = CommonVars[String]("wds.linkis.trino.source", "global")
 
-  val TRINO_SSL_INSECURED = 
CommonVars[Boolean]("wds.linkis.trino.ssl.insecured", false)
+  val TRINO_SSL_INSECURED = 
CommonVars[Boolean]("wds.linkis.trino.ssl.insecured", true)
   val TRINO_SSL_KEYSTORE = CommonVars[String]("wds.linkis.trino.ssl.keystore", 
null)
   val TRINO_SSL_KEYSTORE_TYPE = 
CommonVars[String]("wds.linkis.trino.ssl.keystore.type", null)
 
@@ -67,4 +60,12 @@ object TrinoConfiguration {
   val TRINO_FORBID_MODIFY_SCHEMA =
     CommonVars[Boolean]("wds.linkis.trino.forbid.modifySchema", true)
 
+  val TRINO_USER_ISOLATION_MODE =
+    CommonVars[Boolean]("linkis.trino.user.isolation.mode", false)
+
+  val TRINO_DEFAULT_USER =
+    CommonVars("linkis.trino.default.start.user", 
StorageConfiguration.HDFS_ROOT_USER.getValue)
+
+  val TRINO_SQL_HOOK_ENABLED = CommonVars("linkis.trino.sql.hook.enabled", 
true, "trino sql hoook")
+
 }
diff --git 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index 34d826e90..b1103fcd9 100644
--- 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -38,7 +38,7 @@ import org.apache.linkis.engineplugin.trino.password.{
   StaticPasswordCallback
 }
 import org.apache.linkis.engineplugin.trino.socket.SocketChannelSocketFactory
-import org.apache.linkis.engineplugin.trino.utils.TrinoCode
+import org.apache.linkis.engineplugin.trino.utils.{TrinoCode, TrinoSQLHook}
 import org.apache.linkis.governance.common.paser.SQLCodeParser
 import org.apache.linkis.manager.common.entity.resource.{
   CommonNodeResource,
@@ -60,8 +60,8 @@ import org.apache.linkis.storage.resultset.ResultSetFactory
 import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
 
 import org.apache.commons.io.IOUtils
-import org.apache.commons.lang.exception.ExceptionUtils
 import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
 
 import org.springframework.util.CollectionUtils
 
@@ -168,16 +168,20 @@ class TrinoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
       engineExecutorContext: EngineExecutionContext,
       code: String
   ): ExecuteResponse = {
-    val trimmedCode = code.trim
-    val realCode = getCodeParser
-      .map { parser => parser.parse(trimmedCode).head }
-      .getOrElse(trimmedCode)
+    val enableSqlHook = TRINO_SQL_HOOK_ENABLED.getValue
+    val realCode = if (StringUtils.isBlank(code)) {
+      "SELECT 1"
+    } else if (enableSqlHook) {
+      TrinoSQLHook.preExecuteHook(code.trim)
+    } else {
+      code.trim
+    }
 
     TrinoCode.checkCode(realCode)
     logger.info(s"trino client begins to run psql code:\n $realCode")
 
     val trinoUser = Optional
-      .ofNullable(TRINO_USER.getValue)
+      .ofNullable(TRINO_DEFAULT_USER.getValue)
       .orElseGet(new Supplier[String] {
         override def get(): String = 
getCurrentUser(engineExecutorContext.getLabels)
       })
@@ -366,7 +370,7 @@ class TrinoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
     labels
       .find(l => l.isInstanceOf[UserCreatorLabel])
       .map(label => label.asInstanceOf[UserCreatorLabel].getUser)
-      .getOrElse(TRINO_USER.getValue)
+      .getOrElse(TRINO_DEFAULT_USER.getValue)
   }
 
   private def initialStatusUpdates(
@@ -444,7 +448,7 @@ class TrinoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
           cause = error.getFailureInfo.toException
         }
         engineExecutorContext.appendStdout(
-          LogUtils.generateERROR(ExceptionUtils.getFullStackTrace(cause))
+          LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause))
         )
         ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause)
       } else null
@@ -477,8 +481,9 @@ class TrinoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
 
     var builder: ClientSession.Builder = ClientSession.builder(newSession)
 
-    if (statement.getStartedTransactionId != null)
+    if (statement.getStartedTransactionId != null) {
       builder = builder.withTransactionId(statement.getStartedTransactionId)
+    }
 
     // update session properties if present
     if (
@@ -521,6 +526,7 @@ class TrinoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
         Utils.tryAndWarn(statement.cancelLeafStage())
       }
     }
+    statementClientCache.clear()
   }
 
   override def close(): Unit = {
diff --git 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/utils/TrinoSQLHook.scala
similarity index 59%
copy from 
linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
copy to 
linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/utils/TrinoSQLHook.scala
index 3b57d41ec..772ca77eb 100644
--- 
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/utils/TrinoSQLHook.scala
@@ -15,19 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.trino.builder
-
-import org.apache.linkis.engineplugin.trino.conf.TrinoConfiguration
-import 
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+package org.apache.linkis.engineplugin.trino.utils
 
 import org.apache.commons.lang3.StringUtils
 
-class TrinoProcessEngineConnLaunchBuilder extends 
JavaProcessEngineConnLaunchBuilder {
+object TrinoSQLHook {
+
+  def preExecuteHook(code: String): String = {
+    replaceBackQuoted(code)
+  }
 
-  override def getEngineStartUser(label: UserCreatorLabel): String = {
-    /* using user label if user is blank */
-    StringUtils.defaultIfBlank(TrinoConfiguration.TRINO_USER.getValue, 
label.getUser)
+  private def replaceBackQuoted(code: String): String = {
+    if (StringUtils.isNotBlank(code)) {
+      code.replaceAll("`", "\"")
+    } else {
+      code
+    }
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to