This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dec3ba078de [FLINK-31136][sql-client] Forbid sql client to read 
executor config in the gateway mode (#21987)
dec3ba078de is described below

commit dec3ba078decbdc212a6ea16ad8728aa7409d9c3
Author: Shengkai <33114724+fsk...@users.noreply.github.com>
AuthorDate: Thu Feb 23 11:24:04 2023 +0800

    [FLINK-31136][sql-client] Forbid sql client to read executor config in the 
gateway mode (#21987)
---
 .../table/client/gateway/DefaultContextUtils.java  |  3 +-
 .../apache/flink/table/client/SqlClientTest.java   | 37 +++++++++++++++++++
 .../org/apache/flink/table/gateway/SqlGateway.java |  1 +
 .../gateway/service/context/DefaultContext.java    | 43 +++++++++++++++-------
 .../service/utils/SqlGatewayServiceExtension.java  |  2 +-
 5 files changed, 71 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
index b0be702b15c..e7afce568ec 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
@@ -54,13 +54,14 @@ public class DefaultContextUtils {
         }
         Configuration sessionConfig = options.getPythonConfiguration();
         
sessionConfig.addAll(ConfigurationUtils.createConfiguration(options.getSessionConfig()));
-        return DefaultContext.load(sessionConfig, discoverDependencies(jars, 
libDirs), true);
+        return DefaultContext.load(sessionConfig, discoverDependencies(jars, 
libDirs), true, true);
     }
 
     public static DefaultContext 
buildDefaultContext(CliOptions.GatewayCliOptions options) {
         return DefaultContext.load(
                 
ConfigurationUtils.createConfiguration(options.getSessionConfig()),
                 Collections.emptyList(),
+                false,
                 false);
     }
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
index 801b6299c4a..e107b73b1ac 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.table.client;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.client.cli.TerminalUtils;
+import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -27,7 +30,9 @@ import org.jline.terminal.Size;
 import org.jline.terminal.Terminal;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.ByteArrayInputStream;
@@ -35,6 +40,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -48,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+import static org.apache.flink.configuration.DeploymentOptions.TARGET;
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -57,6 +64,21 @@ class SqlClientTest {
 
     @TempDir private Path tempFolder;
 
+    @RegisterExtension
+    @Order(1)
+    public static final SqlGatewayServiceExtension 
SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(
+                    () -> {
+                        Configuration configuration = new Configuration();
+                        configuration.set(TARGET, "yarn-session");
+                        return configuration;
+                    });
+
+    @RegisterExtension
+    @Order(2)
+    private static final SqlGatewayRestEndpointExtension 
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+            new 
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
     private Map<String, String> originalEnv;
 
     private String historyPath;
@@ -120,6 +142,21 @@ class SqlClientTest {
         assertThat(actual).contains("Command history file path");
     }
 
+    @Test
+    void testGatewayMode() throws Exception {
+        String[] args =
+                new String[] {
+                    "gateway",
+                    "-e",
+                    InetSocketAddress.createUnresolved(
+                                    
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+                                    
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort())
+                            .toString()
+                };
+        String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), 
false);
+        assertThat(actual).contains("execution.target", "yarn-session");
+    }
+
     @Test
     void testGatewayModeWithoutAddress() throws Exception {
         String[] args = new String[] {"gateway"};
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
index 3ea22b99283..7ea06c62766 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java
@@ -113,6 +113,7 @@ public class SqlGateway {
                 DefaultContext.load(
                         
ConfigurationUtils.createConfiguration(cliOptions.getDynamicConfigs()),
                         Collections.emptyList(),
+                        true,
                         true);
         SqlGateway gateway =
                 new SqlGateway(
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
index ff3562fc951..a48699d737f 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -125,8 +125,20 @@ public class DefaultContext {
 
     // 
-------------------------------------------------------------------------------------------
 
+    /**
+     * Build the {@link DefaultContext} from flink-conf.yaml, dynamic 
configuration and users
+     * specified jars.
+     *
+     * @param dynamicConfig user specified configuration.
+     * @param dependencies user specified jars
+     * @param discoverExecutionConfig flag whether to load the execution 
configuration
+     * @param discoverPythonJar flag whetehr to load the python jar
+     */
     public static DefaultContext load(
-            Configuration dynamicConfig, List<URL> dependencies, boolean 
discoverPythonDependency) {
+            Configuration dynamicConfig,
+            List<URL> dependencies,
+            boolean discoverExecutionConfig,
+            boolean discoverPythonJar) {
         // 1. find the configuration directory
         String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
 
@@ -142,22 +154,27 @@ public class DefaultContext {
         FileSystem.initialize(
                 configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
 
-        if (discoverPythonDependency) {
+        if (discoverPythonJar) {
             dependencies = new ArrayList<>(dependencies);
             dependencies.addAll(discoverPythonDependencies());
         }
 
-        Options commandLineOptions = collectCommandLineOptions(commandLines);
-
-        try {
-            CommandLine deploymentCommandLine =
-                    CliFrontendParser.parse(commandLineOptions, new String[] 
{}, true);
-            configuration.addAll(
-                    createExecutionConfig(
-                            deploymentCommandLine, commandLineOptions, 
commandLines, dependencies));
-        } catch (Exception e) {
-            throw new SqlGatewayException(
-                    "Could not load available CLI with Environment Deployment 
entry.", e);
+        if (discoverExecutionConfig) {
+            Options commandLineOptions = 
collectCommandLineOptions(commandLines);
+
+            try {
+                CommandLine deploymentCommandLine =
+                        CliFrontendParser.parse(commandLineOptions, new 
String[] {}, true);
+                configuration.addAll(
+                        createExecutionConfig(
+                                deploymentCommandLine,
+                                commandLineOptions,
+                                commandLines,
+                                dependencies));
+            } catch (Exception e) {
+                throw new SqlGatewayException(
+                        "Could not load available CLI with Environment 
Deployment entry.", e);
+            }
         }
 
         return new DefaultContext(configuration, dependencies);
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
index dc3b437efc8..7455abac7e3 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
@@ -89,7 +89,7 @@ public class SqlGatewayServiceExtension implements 
BeforeAllCallback, AfterAllCa
             sessionManager =
                     sessionManagerCreator.apply(
                             DefaultContext.load(
-                                    new Configuration(), 
Collections.emptyList(), false));
+                                    new Configuration(), 
Collections.emptyList(), true, false));
         } finally {
             CommonTestUtils.setEnv(originalEnv);
         }

Reply via email to