This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6630ce7d6fcb118900a19bc0bb9143b3d388bf1a
Author: luoyuxia <luoyu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Aug 23 12:33:03 2022 +0800

    [FLINK-28938][hive] Fix HiveServer2 Endpoint can not set variable correctly
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   | 21 +++++++++--
 .../endpoint/hive/util/HiveJdbcParameterUtils.java | 44 ++++++----------------
 .../delegation/hive/copy/HiveSetProcessor.java     |  7 ++++
 .../endpoint/hive/HiveServer2EndpointITCase.java   |  7 +++-
 4 files changed, 41 insertions(+), 38 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 6c6335d07b3..f420e9869e6 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -124,7 +124,7 @@ import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
 import static 
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
-import static 
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
+import static 
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.setVariables;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetColumnsExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetFunctionsExecutor;
@@ -304,10 +304,10 @@ public class HiveServer2Endpoint implements 
TCLIService.Iface, SqlGatewayEndpoin
             sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name());
             sessionConfig.put(RUNTIME_MODE.key(), 
RuntimeExecutionMode.BATCH.name());
             sessionConfig.put(TABLE_DML_SYNC.key(), "true");
-            sessionConfig.putAll(validateAndNormalize(originSessionConf));
-
             HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null);
-            sessionConfig.forEach(conf::set);
+            // set variables to HiveConf or Session's conf
+            setVariables(conf, sessionConfig, originSessionConf);
+
             Catalog hiveCatalog =
                     new HiveCatalog(
                             catalogName,
@@ -401,6 +401,19 @@ public class HiveServer2Endpoint implements 
TCLIService.Iface, SqlGatewayEndpoin
                     tExecuteStatementReq.isSetConfOverlay()
                             ? tExecuteStatementReq.getConfOverlay()
                             : Collections.emptyMap();
+            String loggingOperationEnableVar =
+                    
HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname;
+            if (Boolean.parseBoolean(
+                    executionConfig.getOrDefault(
+                            loggingOperationEnableVar,
+                            
HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED
+                                    .defaultStrVal))) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "SqlGateway doesn't support logging for 
operation. Please disable"
+                                        + " it by setting %s to false.",
+                                loggingOperationEnableVar));
+            }
             long timeout = tExecuteStatementReq.getQueryTimeout();
 
             OperationHandle operationHandle =
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java
index 03c2a5385df..123e3f5b77c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java
@@ -19,17 +19,13 @@
 package org.apache.flink.table.endpoint.hive.util;
 
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
-import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
-
 /**
  * Utils to normalize and validate hive jdbc conf.
  *
@@ -49,37 +45,21 @@ public class HiveJdbcParameterUtils {
     private static final String USE_PREFIX = "use:";
     private static final String USE_DATABASE = "database";
 
-    public static Map<String, String> validateAndNormalize(Map<String, String> 
parameters) {
-        Map<String, String> normalized = new HashMap<>();
+    /**
+     * Use the {@param parameters} to set {@param hiveConf} or {@param 
hiveVariables} according to
+     * what kinds of the parameter belongs.
+     */
+    public static void setVariables(
+            HiveConf hiveConf, Map<String, String> sessionConfigs, Map<String, 
String> parameters) {
         for (Map.Entry<String, String> entry : parameters.entrySet()) {
             String key = entry.getKey();
-            String normalizedKey = key;
             if (key.startsWith(SET_PREFIX)) {
                 String newKey = key.substring(SET_PREFIX.length());
-                // TODO: use HiveParserSetProcessor when FLINK-28096 is fixed
-                if (newKey.startsWith(ENV_PREFIX)) {
-                    throw new ValidationException(
-                            String.format(
-                                    "Can not set env variables %s during the 
session connection.",
-                                    key));
-                } else if (newKey.startsWith(SYSTEM_PREFIX)) {
-                    normalizedKey = newKey.substring(SYSTEM_PREFIX.length());
-                } else if (newKey.startsWith(HIVECONF_PREFIX)) {
-                    normalizedKey = newKey.substring(HIVECONF_PREFIX.length());
-                } else if (newKey.startsWith(HIVEVAR_PREFIX)) {
-                    normalizedKey = newKey.substring(HIVEVAR_PREFIX.length());
-                } else if (newKey.startsWith(METACONF_PREFIX)) {
-                    normalizedKey = newKey.substring(METACONF_PREFIX.length());
-                } else {
-                    normalizedKey = newKey;
-                }
-            } else if (key.startsWith(USE_PREFIX)) {
-                // ignore use parameters
-                continue;
+                HiveSetProcessor.setVariable(hiveConf, sessionConfigs, newKey, 
entry.getValue());
+            } else if (!key.startsWith(USE_PREFIX)) {
+                sessionConfigs.put(key, entry.getValue());
             }
-            normalized.put(normalizedKey, entry.getValue());
         }
-        return normalized;
     }
 
     public static Optional<String> getUsedDefaultDatabase(Map<String, String> 
parameters) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
index 51ad1dddd17..ff8b7de2c8b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
@@ -78,6 +78,13 @@ public class HiveSetProcessor {
                         String.format("'SET %s=%s' FAILED.", varname, 
varvalue), e);
             }
         } else {
+            // here is a little of different from Hive's behavior,
+            // if there's no prefix, we also put it to passed hiveVariables 
for flink
+            // may use it as its own configurations.
+            // Otherwise, there's no way to set Flink's configuration using 
Hive's set command.
+            hiveVariables.put(
+                    varname,
+                    new VariableSubstitution(() -> 
hiveVariables).substitute(hiveConf, varvalue));
             setConf(hiveConf, hiveVariables, varname, varname, varvalue);
         }
     }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 4f474bf4254..62ff42d8364 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -125,7 +125,9 @@ public class HiveServer2EndpointITCase extends TestLogger {
         configs.put(MAX_LENGTH_GENERATED_CODE.key(), "-1");
         // simulate to set config using hive jdbc
         configs.put("set:hiveconf:key", "value");
-        // TODO: set hivevar when FLINK-28096 is fixed
+        configs.put("set:system:ks", "vs");
+        configs.put("set:key1", "value1");
+        configs.put("set:hivevar:key2", "${hiveconf:common-key}");
         openSessionReq.setConfiguration(configs);
         TOpenSessionResp openSessionResp = client.OpenSession(openSessionReq);
         SessionHandle sessionHandle =
@@ -140,7 +142,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
                         new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), 
"true"),
                         new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), 
BATCH.name()),
                         new 
AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"),
-                        new AbstractMap.SimpleEntry<>("key", "value"));
+                        new AbstractMap.SimpleEntry<>("key1", "value1"),
+                        new AbstractMap.SimpleEntry<>("key2", "common-val"));
     }
 
     @Test

Reply via email to