This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dec3ba078de [FLINK-31136][sql-client] Forbid sql client to read executor config in the gateway mode (#21987) dec3ba078de is described below commit dec3ba078decbdc212a6ea16ad8728aa7409d9c3 Author: Shengkai <33114724+fsk...@users.noreply.github.com> AuthorDate: Thu Feb 23 11:24:04 2023 +0800 [FLINK-31136][sql-client] Forbid sql client to read executor config in the gateway mode (#21987) --- .../table/client/gateway/DefaultContextUtils.java | 3 +- .../apache/flink/table/client/SqlClientTest.java | 37 +++++++++++++++++++ .../org/apache/flink/table/gateway/SqlGateway.java | 1 + .../gateway/service/context/DefaultContext.java | 43 +++++++++++++++------- .../service/utils/SqlGatewayServiceExtension.java | 2 +- 5 files changed, 71 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java index b0be702b15c..e7afce568ec 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java @@ -54,13 +54,14 @@ public class DefaultContextUtils { } Configuration sessionConfig = options.getPythonConfiguration(); sessionConfig.addAll(ConfigurationUtils.createConfiguration(options.getSessionConfig())); - return DefaultContext.load(sessionConfig, discoverDependencies(jars, libDirs), true); + return DefaultContext.load(sessionConfig, discoverDependencies(jars, libDirs), true, true); } public static DefaultContext buildDefaultContext(CliOptions.GatewayCliOptions options) { return DefaultContext.load( ConfigurationUtils.createConfiguration(options.getSessionConfig()), Collections.emptyList(), + false, false); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java index 801b6299c4a..e107b73b1ac 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -18,8 +18,11 @@ package org.apache.flink.table.client; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.table.client.cli.TerminalUtils; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; @@ -27,7 +30,9 @@ import org.jline.terminal.Size; import org.jline.terminal.Terminal; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; import java.io.ByteArrayInputStream; @@ -35,6 +40,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -48,6 +54,7 @@ import java.util.List; import java.util.Map; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; +import static org.apache.flink.configuration.DeploymentOptions.TARGET; import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -57,6 +64,21 @@ class SqlClientTest { @TempDir private Path tempFolder; + @RegisterExtension + @Order(1) + public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + new SqlGatewayServiceExtension( + () -> { + Configuration configuration = new Configuration(); + configuration.set(TARGET, "yarn-session"); + return configuration; + }); + + @RegisterExtension + @Order(2) + private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = + new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); + private Map<String, String> originalEnv; private String historyPath; @@ -120,6 +142,21 @@ class SqlClientTest { assertThat(actual).contains("Command history file path"); } + @Test + void testGatewayMode() throws Exception { + String[] args = + new String[] { + "gateway", + "-e", + InetSocketAddress.createUnresolved( + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()) + .toString() + }; + String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), false); + assertThat(actual).contains("execution.target", "yarn-session"); + } + @Test void testGatewayModeWithoutAddress() throws Exception { String[] args = new String[] {"gateway"}; diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java index 3ea22b99283..7ea06c62766 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java @@ -113,6 +113,7 @@ public class SqlGateway { DefaultContext.load( ConfigurationUtils.createConfiguration(cliOptions.getDynamicConfigs()), Collections.emptyList(), + true, true); SqlGateway gateway = new SqlGateway( diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java index ff3562fc951..a48699d737f 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java @@ -125,8 +125,20 @@ public class DefaultContext { // ------------------------------------------------------------------------------------------- + /** + * Build the {@link DefaultContext} from flink-conf.yaml, dynamic configuration and users + * specified jars. + * + * @param dynamicConfig user specified configuration. + * @param dependencies user specified jars + * @param discoverExecutionConfig flag whether to load the execution configuration + * @param discoverPythonJar flag whetehr to load the python jar + */ public static DefaultContext load( - Configuration dynamicConfig, List<URL> dependencies, boolean discoverPythonDependency) { + Configuration dynamicConfig, + List<URL> dependencies, + boolean discoverExecutionConfig, + boolean discoverPythonJar) { // 1. find the configuration directory String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); @@ -142,22 +154,27 @@ public class DefaultContext { FileSystem.initialize( configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); - if (discoverPythonDependency) { + if (discoverPythonJar) { dependencies = new ArrayList<>(dependencies); dependencies.addAll(discoverPythonDependencies()); } - Options commandLineOptions = collectCommandLineOptions(commandLines); - - try { - CommandLine deploymentCommandLine = - CliFrontendParser.parse(commandLineOptions, new String[] {}, true); - configuration.addAll( - createExecutionConfig( - deploymentCommandLine, commandLineOptions, commandLines, dependencies)); - } catch (Exception e) { - throw new SqlGatewayException( - "Could not load available CLI with Environment Deployment entry.", e); + if (discoverExecutionConfig) { + Options commandLineOptions = collectCommandLineOptions(commandLines); + + try { + CommandLine deploymentCommandLine = + CliFrontendParser.parse(commandLineOptions, new String[] {}, true); + configuration.addAll( + createExecutionConfig( + deploymentCommandLine, + commandLineOptions, + commandLines, + dependencies)); + } catch (Exception e) { + throw new SqlGatewayException( + "Could not load available CLI with Environment Deployment entry.", e); + } } return new DefaultContext(configuration, dependencies); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java index dc3b437efc8..7455abac7e3 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java @@ -89,7 +89,7 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa sessionManager = sessionManagerCreator.apply( DefaultContext.load( - new Configuration(), Collections.emptyList(), false)); + new Configuration(), Collections.emptyList(), true, false)); } finally { CommonTestUtils.setEnv(originalEnv); }