This is an automated email from the ASF dual-hosted git repository. alexkun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 7704ed1b7079b80549e3282884deac505b9fb791 Author: peacewong <[email protected]> AuthorDate: Wed Nov 29 20:54:59 2023 +0800 Optimize JDBC connection support cache by task id --- .../engineplugin/jdbc/ConnectionManager.java | 18 +++-- .../jdbc/executor/JDBCEngineConnExecutor.scala | 93 +++++++++++----------- 2 files changed, 59 insertions(+), 52 deletions(-) diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java index b9cd47945..a49613f8d 100644 --- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java +++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java @@ -23,16 +23,19 @@ import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConsta import org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException; import org.apache.linkis.manager.engineplugin.jdbc.utils.JdbcParamUtils; -import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import javax.sql.DataSource; +import java.io.Closeable; import java.security.PrivilegedExceptionAction; -import java.sql.*; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; import java.text.MessageFormat; -import java.util.*; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,7 +45,8 @@ import com.alibaba.druid.pool.DruidDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.*; +import static org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.USERNAME; +import static org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.of; import static org.apache.linkis.manager.engineplugin.jdbc.errorcode.JDBCErrorCodeSummary.*; public class ConnectionManager { @@ -103,8 +107,10 @@ public class ConnectionManager { } for (DataSource dataSource : this.dataSourceFactories.values()) { try { - ((BasicDataSource) dataSource).close(); - } catch (SQLException e) { + if (dataSource instanceof Closeable) { + ((Closeable) dataSource).close(); + } + } catch (Exception e) { LOG.error("Error while closing datasource...", e); } } diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala index 336d1197f..8a2d64fa7 100644 --- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala @@ -19,6 +19,7 @@ package org.apache.linkis.manager.engineplugin.jdbc.executor import org.apache.linkis.common.conf.Configuration import org.apache.linkis.common.utils.{OverloadUtils, Utils} +import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask import org.apache.linkis.engineconn.computation.executor.execute.{ ConcurrentComputationExecutor, EngineExecutionContext @@ -78,6 +79,8 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) private val progressMonitors: util.Map[String, ProgressMonitor[_]] = new ConcurrentHashMap[String, ProgressMonitor[_]]() + private val connectionCache: util.Map[String, Connection] = new util.HashMap[String, Connection]() + override def init(): Unit = { logger.info("jdbc executor start init.") setCodeParser(new SQLCodeParser) @@ -87,49 +90,59 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) } } - override def executeLine( - engineExecutorContext: EngineExecutionContext, - code: String - ): ExecuteResponse = { - val realCode = code.trim() - val taskId = engineExecutorContext.getJobId.get + override def execute(engineConnTask: EngineConnTask): ExecuteResponse = { + val executeResponse = super.execute(engineConnTask) + if (StringUtils.isNotBlank(engineConnTask.getTaskId)) { + val connection = connectionCache.remove(engineConnTask.getTaskId) + logger.info(s"remove task ${engineConnTask.getTaskId} connection") + Utils.tryAndWarn(connection.close()) + } + executeResponse + } - var properties: util.Map[String, String] = Collections.emptyMap() + private def getConnection(engineExecutorContext: EngineExecutionContext): Connection = { - Utils.tryCatch({ - properties = getJDBCRuntimeParams(engineExecutorContext) - }) { e: Throwable => - logger.error(s"try to build JDBC runtime params error! $e") - return ErrorExecuteResponse(e.getMessage, e) + val taskId = engineExecutorContext.getJobId.orNull + if (StringUtils.isNotBlank(taskId) && connectionCache.containsKey(taskId)) { + logger.info( + s"Task ${taskId} paragraph ${engineExecutorContext.getCurrentParagraph} from cache get connection" + ) + return connectionCache.get(taskId) } - + val properties: util.Map[String, String] = getJDBCRuntimeParams(engineExecutorContext) logger.info(s"The jdbc properties is: $properties") val dataSourceName = properties.get(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS) val dataSourceMaxVersionId = properties.get(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID) logger.info( - s"The data source name is [$dataSourceName], and the jdbc client begins to run jdbc code:\n ${realCode.trim}" + s"The data source name is [$dataSourceName], and the jdbc client begins to run task ${taskId}" ) - var connection: Connection = null - var statement: Statement = null - var resultSet: ResultSet = null logger.info(s"The data source properties is $properties") - Utils.tryCatch({ - /* url + user as the cache key */ - val jdbcUrl: String = properties.get(JDBCEngineConnConstant.JDBC_URL) - val execUser: String = properties.get(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER) - val proxyUser: String = properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY) - var dataSourceIdentifier = s"$jdbcUrl-$execUser-$proxyUser" - /* If datasource is used, use datasource name as the cache key */ - if (StringUtils.isNotBlank(dataSourceName)) { - dataSourceIdentifier = s"$dataSourceName-$dataSourceMaxVersionId" - } - connection = connectionManager.getConnection(dataSourceIdentifier, properties) - logger.info("The jdbc connection has created successfully!") - }) { e: Throwable => - logger.error(s"created data source connection error! $e") - return ErrorExecuteResponse("created data source connection error!", e) + /* url + user as the cache key */ + val jdbcUrl: String = properties.get(JDBCEngineConnConstant.JDBC_URL) + val execUser: String = properties.get(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER) + val proxyUser: String = properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY) + var dataSourceIdentifier = s"$jdbcUrl-$execUser-$proxyUser" + /* If datasource is used, use datasource name as the cache key */ + if (StringUtils.isNotBlank(dataSourceName)) { + dataSourceIdentifier = s"$dataSourceName-$dataSourceMaxVersionId" + } + val connection = connectionManager.getConnection(dataSourceIdentifier, properties) + if (StringUtils.isNotBlank(taskId)) { + connectionCache.put(taskId, connection) } + connection + } + + override def executeLine( + engineExecutorContext: EngineExecutionContext, + code: String + ): ExecuteResponse = { + + val taskId = engineExecutorContext.getJobId.get + val connection: Connection = getConnection(engineExecutorContext) + var statement: Statement = null + var resultSet: ResultSet = null try { statement = connection.createStatement() @@ -167,14 +180,10 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) } } finally { if (resultSet != null) { - Utils.tryCatch({ resultSet.close() }) { case e: SQLException => - logger.warn(e.getMessage) - } + Utils.tryAndWarn(resultSet.close()) } if (statement != null) { - Utils.tryCatch({ statement.close() }) { case e: SQLException => - logger.warn(e.getMessage) - } + Utils.tryAndWarn(statement.close()) } } } catch { @@ -182,14 +191,6 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) logger.error(s"Cannot run $code", e) return ErrorExecuteResponse(e.getMessage, e) } finally { - if (connection != null) { - try { - if (!connection.getAutoCommit) connection.commit() - connection.close() - } catch { - case e: SQLException => logger.warn("close connection error.", e) - } - } connectionManager.removeStatement(taskId) } SuccessExecuteResponse() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
