This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new f6ee38fe2 Trino EC Support BackQuoted (#3345)
f6ee38fe2 is described below
commit f6ee38fe22485e81ad3e8de03c55bd059a4f64dd
Author: peacewong <[email protected]>
AuthorDate: Wed Sep 14 10:37:34 2022 +0800
Trino EC Support BackQuoted (#3345)
* optimize trino support BackQuoted
* add statementClientCache clear
---
.../TrinoProcessEngineConnLaunchBuilder.scala | 8 +++++--
.../trino/conf/TrinoConfiguration.scala | 27 +++++++++++-----------
.../trino/executor/TrinoEngineConnExecutor.scala | 26 +++++++++++++--------
.../TrinoSQLHook.scala} | 21 +++++++++--------
4 files changed, 48 insertions(+), 34 deletions(-)
diff --git
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
index 3b57d41ec..a79d180f8 100644
---
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
@@ -26,8 +26,12 @@ import org.apache.commons.lang3.StringUtils
class TrinoProcessEngineConnLaunchBuilder extends
JavaProcessEngineConnLaunchBuilder {
override def getEngineStartUser(label: UserCreatorLabel): String = {
- /* using user label if user is blank */
- StringUtils.defaultIfBlank(TrinoConfiguration.TRINO_USER.getValue,
label.getUser)
+ if (TrinoConfiguration.TRINO_USER_ISOLATION_MODE.getValue) {
+ /* using user label if user is blank */
+ label.getUser
+ } else {
+ TrinoConfiguration.TRINO_DEFAULT_USER.getValue
+ }
}
}
diff --git
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
index 043ed1f1e..42ac6e4cb 100644
---
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
+++
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.engineplugin.trino.conf
import org.apache.linkis.common.conf.{ByteType, CommonVars}
+import org.apache.linkis.storage.utils.StorageConfiguration
import java.lang
@@ -25,31 +26,23 @@ object TrinoConfiguration {
val ENGINE_CONCURRENT_LIMIT =
CommonVars[Int]("wds.linkis.engineconn.concurrent.limit", 100)
- val ENTRANCE_MAX_JOB_INSTANCE =
CommonVars[Int]("wds.linkis.entrance.max.job.instance", 100)
-
- val ENTRANCE_PROTECTED_JOB_INSTANCE =
- CommonVars[Int]("wds.linkis.entrance.protected.job.instance", 0)
-
- val ENTRANCE_RESULTS_MAX_CACHE =
- CommonVars("wds.linkis.trino.resultSet.cache.max", new ByteType("512k"))
-
val DEFAULT_LIMIT = CommonVars[Int]("wds.linkis.trino.default.limit", 5000)
val TRINO_HTTP_CONNECT_TIME_OUT =
- CommonVars[java.lang.Long]("wds.linkis.trino.http.connectTimeout", new
lang.Long(60)) // seconds
+ CommonVars[java.lang.Long]("wds.linkis.trino.http.connectTimeout.seconds",
new lang.Long(60))
val TRINO_HTTP_READ_TIME_OUT =
- CommonVars[java.lang.Long]("wds.linkis.trino.http.readTimeout", new
lang.Long(60))
+ CommonVars[java.lang.Long]("wds.linkis.trino.http.readTimeout.seconds",
new lang.Long(60))
val TRINO_URL = CommonVars[String]("wds.linkis.trino.url",
"http://127.0.0.1:8080")
- val TRINO_USER = CommonVars[String]("wds.linkis.trino.user", null)
+
val TRINO_PASSWORD = CommonVars[String]("wds.linkis.trino.password", null)
- val TRINO_PASSWORD_CMD = CommonVars[String]("wds.linkis.trino.passwordCmd",
null)
+ val TRINO_PASSWORD_CMD = CommonVars[String]("wds.linkis.trino.password.cmd",
null)
val TRINO_CATALOG = CommonVars[String]("wds.linkis.trino.catalog", "system")
val TRINO_SCHEMA = CommonVars[String]("wds.linkis.trino.schema", "")
val TRINO_SOURCE = CommonVars[String]("wds.linkis.trino.source", "global")
- val TRINO_SSL_INSECURED =
CommonVars[Boolean]("wds.linkis.trino.ssl.insecured", false)
+ val TRINO_SSL_INSECURED =
CommonVars[Boolean]("wds.linkis.trino.ssl.insecured", true)
val TRINO_SSL_KEYSTORE = CommonVars[String]("wds.linkis.trino.ssl.keystore",
null)
val TRINO_SSL_KEYSTORE_TYPE =
CommonVars[String]("wds.linkis.trino.ssl.keystore.type", null)
@@ -67,4 +60,12 @@ object TrinoConfiguration {
val TRINO_FORBID_MODIFY_SCHEMA =
CommonVars[Boolean]("wds.linkis.trino.forbid.modifySchema", true)
+ val TRINO_USER_ISOLATION_MODE =
+ CommonVars[Boolean]("linkis.trino.user.isolation.mode", false)
+
+ val TRINO_DEFAULT_USER =
+ CommonVars("linkis.trino.default.start.user",
StorageConfiguration.HDFS_ROOT_USER.getValue)
+
+ val TRINO_SQL_HOOK_ENABLED = CommonVars("linkis.trino.sql.hook.enabled",
true, "trino sql hoook")
+
}
diff --git
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index 34d826e90..b1103fcd9 100644
---
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -38,7 +38,7 @@ import org.apache.linkis.engineplugin.trino.password.{
StaticPasswordCallback
}
import org.apache.linkis.engineplugin.trino.socket.SocketChannelSocketFactory
-import org.apache.linkis.engineplugin.trino.utils.TrinoCode
+import org.apache.linkis.engineplugin.trino.utils.{TrinoCode, TrinoSQLHook}
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.manager.common.entity.resource.{
CommonNodeResource,
@@ -60,8 +60,8 @@ import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.commons.io.IOUtils
-import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.springframework.util.CollectionUtils
@@ -168,16 +168,20 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
engineExecutorContext: EngineExecutionContext,
code: String
): ExecuteResponse = {
- val trimmedCode = code.trim
- val realCode = getCodeParser
- .map { parser => parser.parse(trimmedCode).head }
- .getOrElse(trimmedCode)
+ val enableSqlHook = TRINO_SQL_HOOK_ENABLED.getValue
+ val realCode = if (StringUtils.isBlank(code)) {
+ "SELECT 1"
+ } else if (enableSqlHook) {
+ TrinoSQLHook.preExecuteHook(code.trim)
+ } else {
+ code.trim
+ }
TrinoCode.checkCode(realCode)
logger.info(s"trino client begins to run psql code:\n $realCode")
val trinoUser = Optional
- .ofNullable(TRINO_USER.getValue)
+ .ofNullable(TRINO_DEFAULT_USER.getValue)
.orElseGet(new Supplier[String] {
override def get(): String =
getCurrentUser(engineExecutorContext.getLabels)
})
@@ -366,7 +370,7 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
labels
.find(l => l.isInstanceOf[UserCreatorLabel])
.map(label => label.asInstanceOf[UserCreatorLabel].getUser)
- .getOrElse(TRINO_USER.getValue)
+ .getOrElse(TRINO_DEFAULT_USER.getValue)
}
private def initialStatusUpdates(
@@ -444,7 +448,7 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
cause = error.getFailureInfo.toException
}
engineExecutorContext.appendStdout(
- LogUtils.generateERROR(ExceptionUtils.getFullStackTrace(cause))
+ LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause))
)
ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause)
} else null
@@ -477,8 +481,9 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
var builder: ClientSession.Builder = ClientSession.builder(newSession)
- if (statement.getStartedTransactionId != null)
+ if (statement.getStartedTransactionId != null) {
builder = builder.withTransactionId(statement.getStartedTransactionId)
+ }
// update session properties if present
if (
@@ -521,6 +526,7 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
Utils.tryAndWarn(statement.cancelLeafStage())
}
}
+ statementClientCache.clear()
}
override def close(): Unit = {
diff --git
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/utils/TrinoSQLHook.scala
similarity index 59%
copy from
linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
copy to
linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/utils/TrinoSQLHook.scala
index 3b57d41ec..772ca77eb 100644
---
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/builder/TrinoProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/utils/TrinoSQLHook.scala
@@ -15,19 +15,22 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.trino.builder
-
-import org.apache.linkis.engineplugin.trino.conf.TrinoConfiguration
-import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+package org.apache.linkis.engineplugin.trino.utils
import org.apache.commons.lang3.StringUtils
-class TrinoProcessEngineConnLaunchBuilder extends
JavaProcessEngineConnLaunchBuilder {
+object TrinoSQLHook {
+
+ def preExecuteHook(code: String): String = {
+ replaceBackQuoted(code)
+ }
- override def getEngineStartUser(label: UserCreatorLabel): String = {
- /* using user label if user is blank */
- StringUtils.defaultIfBlank(TrinoConfiguration.TRINO_USER.getValue,
label.getUser)
+ private def replaceBackQuoted(code: String): String = {
+ if (StringUtils.isNotBlank(code)) {
+ code.replaceAll("`", "\"")
+ } else {
+ code
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]