This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new ec627119b [Bug] Fix impala plugin for client creation (#4776)
ec627119b is described below

commit ec627119bad6d3469cffa5503743f73c80b9c2f0
Author: Knypys <[email protected]>
AuthorDate: Mon Jul 17 16:07:38 2023 +0800

    [Bug] Fix impala plugin for client creation (#4776)
    
    * fix usage docs
    
    * fix engine for linkis-storage type change
    
    * fix impala executer bugs
    
    * fix impala client creation
---
 .../client/thrift/ImpalaThriftSessionFactory.java  |  2 +-
 .../impala/executor/ImpalaEngineConnExecutor.scala | 67 ++++++++--------------
 2 files changed, 24 insertions(+), 45 deletions(-)

diff --git 
a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
 
b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
index fd6eb0e9f..472564253 100644
--- 
a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
+++ 
b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
@@ -90,7 +90,7 @@ public class ImpalaThriftSessionFactory {
     }
 
     if (sessions.isEmpty()) {
-      throw new IllegalArgumentException("invalid hosts: " + 
StringUtils.join(hosts, ','));
+      throw new IllegalArgumentException("Invalid hosts: " + 
StringUtils.join(hosts, ','));
     }
 
     this.socketFactory = socketFactory;
diff --git 
a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
index d9cf13a3c..23cd1a0e6 100644
--- 
a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
@@ -17,73 +17,44 @@
 
 package org.apache.linkis.engineplugin.impala.executor
 
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{OverloadUtils, Utils}
-import org.apache.linkis.engineconn.common.password.{
-  CommandPasswordCallback,
-  StaticPasswordCallback
-}
-import org.apache.linkis.engineconn.computation.executor.execute.{
-  ConcurrentComputationExecutor,
-  EngineExecutionContext
-}
+import org.apache.linkis.engineconn.common.password.{CommandPasswordCallback, 
StaticPasswordCallback}
+import 
org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor,
 EngineExecutionContext}
 import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.impala.client.{
-  ExecutionListener,
-  ImpalaClient,
-  ImpalaResultSet
-}
 import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
-import org.apache.linkis.engineplugin.impala.client.exception.{
-  ImpalaEngineException,
-  ImpalaErrorCodeSummary
-}
+import 
org.apache.linkis.engineplugin.impala.client.exception.{ImpalaEngineException, 
ImpalaErrorCodeSummary}
 import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress, 
ExecStatus}
-import org.apache.linkis.engineplugin.impala.client.thrift.{
-  ImpalaThriftClient,
-  ImpalaThriftSessionFactory
-}
+import 
org.apache.linkis.engineplugin.impala.client.thrift.{ImpalaThriftClient, 
ImpalaThriftSessionFactory}
+import org.apache.linkis.engineplugin.impala.client.{ExecutionListener, 
ImpalaClient, ImpalaResultSet}
 import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
 import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
 import org.apache.linkis.governance.common.paser.SQLCodeParser
-import org.apache.linkis.manager.common.entity.resource.{
-  CommonNodeResource,
-  LoadResource,
-  NodeResource
-}
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, 
LoadResource, NodeResource}
 import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, 
UserCreatorLabel}
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
-  CompletedExecuteResponse,
-  ErrorExecuteResponse,
-  ExecuteResponse,
-  SuccessExecuteResponse
-}
+import org.apache.linkis.scheduler.executer.{CompletedExecuteResponse, 
ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
 import org.apache.linkis.storage.domain.Column
 import org.apache.linkis.storage.resultset.ResultSetFactory
 import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-
 import org.springframework.util.CollectionUtils
 
-import javax.net.SocketFactory
-import javax.net.ssl._
-import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, 
PasswordCallback}
-
 import java.io.FileInputStream
 import java.security.KeyStore
 import java.util
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Consumer
-
+import javax.net.SocketFactory
+import javax.net.ssl._
+import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, 
PasswordCallback}
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
     extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -308,7 +279,7 @@ class ImpalaEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
       
engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
     var configMap: util.Map[String, String] = null
     if (userCreatorLabel != null && engineTypeLabel != null) {
-      configMap = Utils.tryAndError(
+      configMap = Utils.tryAndWarn(
         ImpalaEngineConfig.getCacheMap(
           (
             userCreatorLabel.asInstanceOf[UserCreatorLabel],
@@ -317,6 +288,14 @@ class ImpalaEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
         )
       )
     }
+    if (configMap == null) {
+      configMap = new util.HashMap[String, String]()
+    }
+
+    val properties = 
engineExecutionContext.getProperties.asInstanceOf[util.Map[String, String]]
+    if (MapUtils.isNotEmpty(properties)) {
+      configMap.putAll(properties)
+    }
 
     val impalaServers = IMPALA_SERVERS.getValue(configMap)
     val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to