This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6630ce7d6fcb118900a19bc0bb9143b3d388bf1a Author: luoyuxia <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Tue Aug 23 12:33:03 2022 +0800 [FLINK-28938][hive] Fix HiveServer2 Endpoint can not set variable correctly --- .../table/endpoint/hive/HiveServer2Endpoint.java | 21 +++++++++-- .../endpoint/hive/util/HiveJdbcParameterUtils.java | 44 ++++++---------------- .../delegation/hive/copy/HiveSetProcessor.java | 7 ++++ .../endpoint/hive/HiveServer2EndpointITCase.java | 7 +++- 4 files changed, 41 insertions(+), 38 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java index 6c6335d07b3..f420e9869e6 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java @@ -124,7 +124,7 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10; import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase; -import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize; +import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.setVariables; import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor; import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetColumnsExecutor; import static org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetFunctionsExecutor; @@ -304,10 +304,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()); sessionConfig.put(RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.name()); sessionConfig.put(TABLE_DML_SYNC.key(), "true"); - sessionConfig.putAll(validateAndNormalize(originSessionConf)); - HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null); - sessionConfig.forEach(conf::set); + // set variables to HiveConf or Session's conf + setVariables(conf, sessionConfig, originSessionConf); + Catalog hiveCatalog = new HiveCatalog( catalogName, @@ -401,6 +401,19 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin tExecuteStatementReq.isSetConfOverlay() ? tExecuteStatementReq.getConfOverlay() : Collections.emptyMap(); + String loggingOperationEnableVar = + HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname; + if (Boolean.parseBoolean( + executionConfig.getOrDefault( + loggingOperationEnableVar, + HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED + .defaultStrVal))) { + throw new IllegalArgumentException( + String.format( + "SqlGateway doesn't support logging for operation. Please disable" + + " it by setting %s to false.", + loggingOperationEnableVar)); + } long timeout = tExecuteStatementReq.getQueryTimeout(); OperationHandle operationHandle = diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java index 03c2a5385df..123e3f5b77c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/HiveJdbcParameterUtils.java @@ -19,17 +19,13 @@ package org.apache.flink.table.endpoint.hive.util; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor; + +import org.apache.hadoop.hive.conf.HiveConf; -import java.util.HashMap; import java.util.Map; import java.util.Optional; -import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX; -import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX; -import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX; -import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX; -import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX; - /** * Utils to normalize and validate hive jdbc conf. * @@ -49,37 +45,21 @@ public class HiveJdbcParameterUtils { private static final String USE_PREFIX = "use:"; private static final String USE_DATABASE = "database"; - public static Map<String, String> validateAndNormalize(Map<String, String> parameters) { - Map<String, String> normalized = new HashMap<>(); + /** + * Use the {@param parameters} to set {@param hiveConf} or {@param hiveVariables} according to + * what kinds of the parameter belongs. + */ + public static void setVariables( + HiveConf hiveConf, Map<String, String> sessionConfigs, Map<String, String> parameters) { for (Map.Entry<String, String> entry : parameters.entrySet()) { String key = entry.getKey(); - String normalizedKey = key; if (key.startsWith(SET_PREFIX)) { String newKey = key.substring(SET_PREFIX.length()); - // TODO: use HiveParserSetProcessor when FLINK-28096 is fixed - if (newKey.startsWith(ENV_PREFIX)) { - throw new ValidationException( - String.format( - "Can not set env variables %s during the session connection.", - key)); - } else if (newKey.startsWith(SYSTEM_PREFIX)) { - normalizedKey = newKey.substring(SYSTEM_PREFIX.length()); - } else if (newKey.startsWith(HIVECONF_PREFIX)) { - normalizedKey = newKey.substring(HIVECONF_PREFIX.length()); - } else if (newKey.startsWith(HIVEVAR_PREFIX)) { - normalizedKey = newKey.substring(HIVEVAR_PREFIX.length()); - } else if (newKey.startsWith(METACONF_PREFIX)) { - normalizedKey = newKey.substring(METACONF_PREFIX.length()); - } else { - normalizedKey = newKey; - } - } else if (key.startsWith(USE_PREFIX)) { - // ignore use parameters - continue; + HiveSetProcessor.setVariable(hiveConf, sessionConfigs, newKey, entry.getValue()); + } else if (!key.startsWith(USE_PREFIX)) { + sessionConfigs.put(key, entry.getValue()); } - normalized.put(normalizedKey, entry.getValue()); } - return normalized; } public static Optional<String> getUsedDefaultDatabase(Map<String, String> parameters) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java index 51ad1dddd17..ff8b7de2c8b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java @@ -78,6 +78,13 @@ public class HiveSetProcessor { String.format("'SET %s=%s' FAILED.", varname, varvalue), e); } } else { + // here is a little of different from Hive's behavior, + // if there's no prefix, we also put it to passed hiveVariables for flink + // may use it as its own configurations. + // Otherwise, there's no way to set Flink's configuration using Hive's set command. + hiveVariables.put( + varname, + new VariableSubstitution(() -> hiveVariables).substitute(hiveConf, varvalue)); setConf(hiveConf, hiveVariables, varname, varname, varvalue); } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java index 4f474bf4254..62ff42d8364 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java @@ -125,7 +125,9 @@ public class HiveServer2EndpointITCase extends TestLogger { configs.put(MAX_LENGTH_GENERATED_CODE.key(), "-1"); // simulate to set config using hive jdbc configs.put("set:hiveconf:key", "value"); - // TODO: set hivevar when FLINK-28096 is fixed + configs.put("set:system:ks", "vs"); + configs.put("set:key1", "value1"); + configs.put("set:hivevar:key2", "${hiveconf:common-key}"); openSessionReq.setConfiguration(configs); TOpenSessionResp openSessionResp = client.OpenSession(openSessionReq); SessionHandle sessionHandle = @@ -140,7 +142,8 @@ public class HiveServer2EndpointITCase extends TestLogger { new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), "true"), new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), BATCH.name()), new AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"), - new AbstractMap.SimpleEntry<>("key", "value")); + new AbstractMap.SimpleEntry<>("key1", "value1"), + new AbstractMap.SimpleEntry<>("key2", "common-val")); } @Test