This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 27cecdebccc95e9acf6ba38616b14e531b35d1be Author: Shengkai <1059623...@qq.com> AuthorDate: Tue Jul 26 10:15:46 2022 +0800 [FLINK-28152][sql-gateway][hive] Introduce option "thrift.host" for HiveServer2 Endpoint and improve codes --- .../table/endpoint/hive/HiveServer2Endpoint.java | 35 +++++--- .../hive/HiveServer2EndpointConfigOptions.java | 11 ++- .../endpoint/hive/HiveServer2EndpointFactory.java | 23 +++++- .../hive/HiveServer2EndpointFactoryTest.java | 93 ++++++++++++---------- .../endpoint/hive/HiveServer2EndpointITCase.java | 17 +++- .../hive/util/HiveServer2EndpointExtension.java | 2 + .../gateway/service/context/SessionContext.java | 6 ++ .../service/operation/OperationManager.java | 2 + 8 files changed, 131 insertions(+), 58 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java index 4c25f9a2b7d..262177a547e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java @@ -19,6 +19,7 @@ package org.apache.flink.table.endpoint.hive; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.hive.HiveCatalog; @@ -100,6 +101,8 @@ import java.util.Objects; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10; import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase; @@ -126,13 +129,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin // -------------------------------------------------------------------------------------------- private final SqlGatewayService service; + private final InetAddress hostAddress; + private final int port; private final int minWorkerThreads; private final int maxWorkerThreads; private final Duration workerKeepAliveTime; private final int requestTimeoutMs; private final int backOffSlotLengthMs; private final long maxMessageSize; - private final int port; private final Thread serverThread = new Thread(this, "HiveServer2 Endpoint"); private ThreadPoolExecutor executor; @@ -155,6 +159,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin public HiveServer2Endpoint( SqlGatewayService service, + InetAddress hostAddress, int port, long maxMessageSize, int requestTimeoutMs, @@ -168,6 +173,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin String moduleName) { this( service, + hostAddress, port, maxMessageSize, requestTimeoutMs, @@ -185,6 +191,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @VisibleForTesting public HiveServer2Endpoint( SqlGatewayService service, + InetAddress hostAddress, int port, long maxMessageSize, int requestTimeoutMs, @@ -199,6 +206,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin boolean allowEmbedded) { this.service = service; + this.hostAddress = hostAddress; this.port = port; this.maxMessageSize = maxMessageSize; this.requestTimeoutMs = requestTimeoutMs; @@ -228,7 +236,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin } if (executor != null) { - executor.shutdown(); + executor.shutdownNow(); } } @@ -255,6 +263,8 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin : tOpenSessionReq.getConfiguration(); Map<String, String> sessionConfig = new HashMap<>(); sessionConfig.put(TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()); + sessionConfig.put(RUNTIME_MODE.key(), RuntimeExecutionMode.BATCH.name()); + sessionConfig.put(TABLE_DML_SYNC.key(), "true"); sessionConfig.putAll(validateAndNormalize(originSessionConf)); HiveConf conf = HiveCatalog.createHiveConf(hiveConfPath, null); @@ -284,7 +294,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin resp.setSessionHandle(toTSessionHandle(sessionHandle)); resp.setConfiguration(service.getSessionConfig(sessionHandle)); } catch (Exception e) { - LOG.error("Failed to openSession.", e); + LOG.error("Failed to OpenSession.", e); resp.setStatus(toTStatus(e)); } return resp; @@ -298,7 +308,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin service.closeSession(sessionHandle); resp.setStatus(OK_STATUS); } catch (Throwable t) { - LOG.error("Failed to closeSession.", t); + LOG.error("Failed to CloseSession.", t); resp.setStatus(toTStatus(t)); } return resp; @@ -419,12 +429,13 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin } HiveServer2Endpoint that = (HiveServer2Endpoint) o; - return minWorkerThreads == that.minWorkerThreads + return Objects.equals(hostAddress, that.hostAddress) + && port == that.port + && minWorkerThreads == that.minWorkerThreads && maxWorkerThreads == that.maxWorkerThreads && requestTimeoutMs == that.requestTimeoutMs && backOffSlotLengthMs == that.backOffSlotLengthMs && maxMessageSize == that.maxMessageSize - && port == that.port && Objects.equals(workerKeepAliveTime, that.workerKeepAliveTime) && Objects.equals(catalogName, that.catalogName) && Objects.equals(defaultDatabase, that.defaultDatabase) @@ -436,13 +447,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public int hashCode() { return Objects.hash( + hostAddress, + port, minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, requestTimeoutMs, backOffSlotLengthMs, maxMessageSize, - port, catalogName, defaultDatabase, hiveConfPath, @@ -453,10 +465,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public void run() { try { - LOG.info("HiveServer2 Endpoint begins to listen on {}.", port); + LOG.info( + "HiveServer2 Endpoint begins to listen on {}:{}.", + hostAddress.getHostAddress(), + port); server.serve(); } catch (Throwable t) { LOG.error("Exception caught by " + getClass().getSimpleName() + ". Exiting.", t); + System.exit(-1); } } @@ -472,8 +488,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin new TThreadPoolServer( new TThreadPoolServer.Args( new TServerSocket( - new ServerSocket( - port, -1, InetAddress.getByName(null)))) + new ServerSocket(port, -1, hostAddress))) .processorFactory( new TProcessorFactory( new TCLIService.Processor<>(this))) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java index 22f82ca1fbb..483910a01d3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointConfigOptions.java @@ -35,10 +35,19 @@ public class HiveServer2EndpointConfigOptions { // Server Options // -------------------------------------------------------------------------------------------- + public static final ConfigOption<String> THRIFT_HOST = + ConfigOptions.key("thrift.host") + .stringType() + .defaultValue("localhost") + .withDescription( + "The server address of HiveServer2 host to be used for communication." + + "Default is empty, which means the to bind to the localhost. " + + "This is only necessary if the host has multiple network addresses."); + public static final ConfigOption<Integer> THRIFT_PORT = ConfigOptions.key("thrift.port") .intType() - .defaultValue(8084) + .defaultValue(10000) .withDescription("The port of the HiveServer2 endpoint"); public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MIN = diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java index 7a1fd06e596..00ce8d287c9 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactory.java @@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint; import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory; import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -33,6 +36,7 @@ import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOpti import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_NAME; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.MODULE_NAME; +import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_HOST; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_TIMEOUT; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_MAX_MESSAGE_SIZE; @@ -55,6 +59,7 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory { validate(configuration); return new HiveServer2Endpoint( context.getSqlGatewayService(), + getHostAddress(configuration.get(THRIFT_HOST)), configuration.get(THRIFT_PORT), checkNotNull(configuration.get(THRIFT_MAX_MESSAGE_SIZE)), (int) configuration.get(THRIFT_LOGIN_TIMEOUT).toMillis(), @@ -75,8 +80,14 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory { @Override public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { return new HashSet<>( Arrays.asList( + THRIFT_HOST, THRIFT_PORT, THRIFT_MAX_MESSAGE_SIZE, THRIFT_LOGIN_TIMEOUT, @@ -84,12 +95,18 @@ public class HiveServer2EndpointFactory implements SqlGatewayEndpointFactory { THRIFT_WORKER_THREADS_MAX, THRIFT_WORKER_KEEPALIVE_TIME, CATALOG_NAME, + CATALOG_HIVE_CONF_DIR, + CATALOG_DEFAULT_DATABASE, MODULE_NAME)); } - @Override - public Set<ConfigOption<?>> optionalOptions() { - return new HashSet<>(Arrays.asList(CATALOG_HIVE_CONF_DIR, CATALOG_DEFAULT_DATABASE)); + private static InetAddress getHostAddress(String hostName) { + try { + return InetAddress.getByName(hostName); + } catch (UnknownHostException e) { + throw new ValidationException( + String.format("Can not get the address for the host '%s'.", hostName)); + } } private static void validate(ReadableConfig configuration) { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java index cd2787193db..6fc7879ae4a 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointFactoryTest.java @@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtil import org.apache.flink.table.gateway.api.utils.MockedSqlGatewayService; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import java.net.InetAddress; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -38,6 +41,7 @@ import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOpti import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_NAME; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.MODULE_NAME; +import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_HOST; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_LOGIN_TIMEOUT; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_MAX_MESSAGE_SIZE; @@ -70,7 +74,7 @@ class HiveServer2EndpointFactoryTest { private final String moduleName = "test-module"; @Test - public void testCreateHiveServer2Endpoint() { + public void testCreateHiveServer2Endpoint() throws Exception { assertThat( SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint( service, Configuration.fromMap(getDefaultConfig()))) @@ -78,6 +82,7 @@ class HiveServer2EndpointFactoryTest { Collections.singletonList( new HiveServer2Endpoint( service, + InetAddress.getByName("localhost"), port, maxMessageSize, (int) loginTimeout.toMillis(), @@ -91,45 +96,47 @@ class HiveServer2EndpointFactoryTest { moduleName))); } - @Test - public void testCreateHiveServer2EndpointWithIllegalArgument() { - List<TestSpec<?>> specs = - Arrays.asList( - new TestSpec<>( - THRIFT_WORKER_THREADS_MIN, - -1, - "The specified min thrift worker thread number is -1, which should be larger than 0."), - new TestSpec<>( - THRIFT_WORKER_THREADS_MAX, - 0, - "The specified max thrift worker thread number is 0, which should be larger than 0."), - new TestSpec<>( - THRIFT_PORT, - 1008668001, - "The specified port is 1008668001, which should range from 0 to 65535."), - new TestSpec<>( - THRIFT_LOGIN_TIMEOUT, - "9223372036854775807 ms", - "The specified login timeout should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."), - new TestSpec<>( - THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, - "9223372036854775807 ms", - "The specified binary exponential backoff slot time should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms.")); - - for (TestSpec<?> spec : specs) { - assertThatThrownBy( - () -> - SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint( - service, - Configuration.fromMap( - getModifiedConfig( - config -> - setEndpointOption( - config, - spec.option, - spec.value))))) - .satisfies(FlinkAssertions.anyCauseMatches(spec.exceptionMessage)); - } + @ParameterizedTest + @MethodSource("getIllegalArgumentTestSpecs") + public void testCreateHiveServer2EndpointWithIllegalArgument(TestSpec<?> spec) { + assertThatThrownBy( + () -> + SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint( + service, + Configuration.fromMap( + getModifiedConfig( + config -> + setEndpointOption( + config, + spec.option, + spec.value))))) + .satisfies(FlinkAssertions.anyCauseMatches(spec.exceptionMessage)); + } + + // -------------------------------------------------------------------------------------------- + + private static List<TestSpec<?>> getIllegalArgumentTestSpecs() { + return Arrays.asList( + new TestSpec<>( + THRIFT_WORKER_THREADS_MIN, + -1, + "The specified min thrift worker thread number is -1, which should be larger than 0."), + new TestSpec<>( + THRIFT_WORKER_THREADS_MAX, + 0, + "The specified max thrift worker thread number is 0, which should be larger than 0."), + new TestSpec<>( + THRIFT_PORT, + 1008668001, + "The specified port is 1008668001, which should range from 0 to 65535."), + new TestSpec<>( + THRIFT_LOGIN_TIMEOUT, + "9223372036854775807 ms", + "The specified login timeout should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms."), + new TestSpec<>( + THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, + "9223372036854775807 ms", + "The specified binary exponential backoff slot time should range from 0 ms to 2147483647 ms but the specified value is 9223372036854775807 ms.")); } // -------------------------------------------------------------------------------------------- @@ -145,6 +152,11 @@ class HiveServer2EndpointFactoryTest { this.value = value; this.exceptionMessage = exceptionMessage; } + + @Override + public String toString() { + return "TestSpec{option=" + option.key() + '}'; + } } private Map<String, String> getModifiedConfig(Consumer<Map<String, String>> consumer) { @@ -158,6 +170,7 @@ class HiveServer2EndpointFactoryTest { config.put(SQL_GATEWAY_ENDPOINT_TYPE.key(), IDENTIFIER); + setEndpointOption(config, THRIFT_HOST, "localhost"); setEndpointOption(config, THRIFT_PORT, port); setEndpointOption(config, THRIFT_WORKER_THREADS_MIN, minWorkerThreads); setEndpointOption(config, THRIFT_WORKER_THREADS_MAX, maxWorkerThreads); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java index 7268220cd7c..30951a5765c 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java @@ -41,12 +41,16 @@ import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.net.InetAddress; import java.sql.Connection; import java.sql.DriverManager; import java.util.AbstractMap; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; import static org.assertj.core.api.Assertions.assertThat; @@ -83,7 +87,7 @@ public class HiveServer2EndpointITCase extends TestLogger { TOpenSessionReq openSessionReq = new TOpenSessionReq(); Map<String, String> configs = new HashMap<>(); - configs.put(TABLE_DML_SYNC.key(), "true"); + configs.put(MAX_LENGTH_GENERATED_CODE.key(), "-1"); // simulate to set config using hive jdbc configs.put("set:hiveconf:key", "value"); // TODO: set hivevar when FLINK-28096 is fixed @@ -99,6 +103,8 @@ public class HiveServer2EndpointITCase extends TestLogger { new AbstractMap.SimpleEntry<>( TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name()), new AbstractMap.SimpleEntry<>(TABLE_DML_SYNC.key(), "true"), + new AbstractMap.SimpleEntry<>(RUNTIME_MODE.key(), BATCH.name()), + new AbstractMap.SimpleEntry<>(MAX_LENGTH_GENERATED_CODE.key(), "-1"), new AbstractMap.SimpleEntry<>("key", "value")); } @@ -121,13 +127,16 @@ public class HiveServer2EndpointITCase extends TestLogger { private Connection getConnection() throws Exception { return DriverManager.getConnection( String.format( - "jdbc:hive2://localhost:%s/default;auth=noSasl", - ENDPOINT_EXTENSION.getPort())); + "jdbc:hive2://%s:%s/default;auth=noSasl", + InetAddress.getLocalHost().getHostAddress(), ENDPOINT_EXTENSION.getPort())); } private TCLIService.Client createClient() throws Exception { TTransport transport = - HiveAuthUtils.getSocketTransport("localhost", ENDPOINT_EXTENSION.getPort(), 0); + HiveAuthUtils.getSocketTransport( + InetAddress.getLocalHost().getHostAddress(), + ENDPOINT_EXTENSION.getPort(), + 0); transport.open(); return new TCLIService.Client(new TBinaryProtocol(transport)); } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java index 8b4957bb8c2..fa6093f4451 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/HiveServer2EndpointExtension.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; +import java.net.InetAddress; import java.util.function.Supplier; import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_DEFAULT_DATABASE; @@ -67,6 +68,7 @@ public class HiveServer2EndpointExtension implements BeforeAllCallback, AfterAll endpoint = new HiveServer2Endpoint( serviceSupplier.get(), + InetAddress.getLocalHost(), port.getPort(), checkNotNull(endpointConfig.get(THRIFT_MAX_MESSAGE_SIZE)), (int) endpointConfig.get(THRIFT_LOGIN_TIMEOUT).toMillis(), diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 2b5f103460a..44eee43fde9 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -57,7 +57,9 @@ import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayDeque; import java.util.Collections; +import java.util.Deque; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -154,7 +156,11 @@ public class SessionContext { } public void registerModule(String moduleName, Module module) { + Deque<String> moduleNames = new ArrayDeque<>(sessionState.moduleManager.listModules()); + moduleNames.addFirst(moduleName); + sessionState.moduleManager.loadModule(moduleName, module); + sessionState.moduleManager.useModules(moduleNames.toArray(new String[0])); } public void setCurrentCatalog(String catalog) { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java index ad2abd80698..80ec381eb96 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java @@ -354,6 +354,8 @@ public class OperationManager { } } + // ------------------------------------------------------------------------------------------- + @VisibleForTesting public int getOperationCount() { return submittedOperations.size();