This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a8b8c03e3a [FLINK-36760][sql-client] Supports to deploy script via 
sql client (#25754)
4a8b8c03e3a is described below

commit 4a8b8c03e3aa2677fe5ab94a09ee8ae754785938
Author: Shengkai <33114724+fsk...@users.noreply.github.com>
AuthorDate: Tue Jan 7 19:15:17 2025 +0800

    [FLINK-36760][sql-client] Supports to deploy script via sql client (#25754)
---
 .../flink/client/program/PackagedProgramUtils.java |  78 ++++++----
 .../flink/table/sql/CreateTableAsITCase.java       |   5 +-
 .../org/apache/flink/table/sql/HdfsITCaseBase.java |   5 +-
 .../flink/table/sql/PlannerScalaFreeITCase.java    |   5 +-
 .../org/apache/flink/table/sql/SqlITCaseBase.java  |  14 +-
 .../flink/table/sql/UsingRemoteJarITCase.java      |  20 +++
 .../test/resources/sql_client_remote_jar_e2e.sql   |  40 +++++
 .../org/apache/flink/table/client/SqlClient.java   |  27 +---
 .../apache/flink/table/client/cli/CliClient.java   |  71 ++++++++-
 .../apache/flink/table/client/cli/CliOptions.java  |  33 ++--
 .../flink/table/client/cli/CliOptionsParser.java   |  53 +++----
 .../apache/flink/table/client/cli/CliStrings.java  |   2 +
 .../apache/flink/table/client/cli/CliUtils.java    |  12 +-
 .../table/client/gateway/DefaultContextUtils.java  |  22 +--
 .../flink/table/client/gateway/Executor.java       |  12 ++
 .../flink/table/client/gateway/ExecutorImpl.java   |  16 ++
 .../table/client/gateway/SingleSessionManager.java |  68 +++++++--
 .../apache/flink/table/client/SqlClientTest.java   |  23 +--
 .../flink/table/client/cli/CliClientTest.java      |  52 ++++++-
 .../table/client/gateway/ExecutorImplITCase.java   |   8 +-
 .../gateway/service/context/DefaultContext.java    |  24 ++-
 .../flink/table/jdbc/FlinkStatementTest.java       |   8 +
 .../apache/flink/test/util/SQLJobSubmission.java   |  13 ++
 .../flink/yarn/SqlYARNApplicationITCase.java       | 170 +++++++++++++++++++++
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  15 +-
 .../flink/yarn/YarnClusterClientFactory.java       |   8 +
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  25 ++-
 27 files changed, 648 insertions(+), 181 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 6f105e19215..7833b179bc0 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -53,6 +53,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 public enum PackagedProgramUtils {
     ;
 
+    private static final String SQL_DRIVER_CLASS_NAME =
+            "org.apache.flink.table.runtime.application.SqlDriver";
+
     private static final String PYTHON_GATEWAY_CLASS_NAME =
             "org.apache.flink.client.python.PythonGatewayServer";
 
@@ -193,43 +196,21 @@ public enum PackagedProgramUtils {
     }
 
     public static URL getPythonJar() {
-        String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR);
-        final List<Path> pythonJarPath = new ArrayList<>();
-        try {
-            Files.walkFileTree(
-                    FileSystems.getDefault().getPath(flinkOptPath),
-                    new SimpleFileVisitor<Path>() {
-                        @Override
-                        public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs)
-                                throws IOException {
-                            FileVisitResult result = super.visitFile(file, 
attrs);
-                            if 
(file.getFileName().toString().startsWith("flink-python")) {
-                                pythonJarPath.add(file);
-                            }
-                            return result;
-                        }
-                    });
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    "Exception encountered during finding the flink-python 
jar. This should not happen.",
-                    e);
-        }
-
-        if (pythonJarPath.size() != 1) {
-            throw new RuntimeException("Found " + pythonJarPath.size() + " 
flink-python jar.");
-        }
-
-        try {
-            return pythonJarPath.get(0).toUri().toURL();
-        } catch (MalformedURLException e) {
-            throw new RuntimeException("URL is invalid. This should not 
happen.", e);
-        }
+        return getOptJar("flink-python");
     }
 
     public static String getPythonDriverClassName() {
         return PYTHON_DRIVER_CLASS_NAME;
     }
 
+    public static boolean isSqlApplication(String entryPointClassName) {
+        return (entryPointClassName != null) && 
(entryPointClassName.equals(SQL_DRIVER_CLASS_NAME));
+    }
+
+    public static URL getSqlGatewayJar() {
+        return getOptJar("flink-sql-gateway");
+    }
+
     public static URI resolveURI(String path) throws URISyntaxException {
         final URI uri = new URI(path);
         if (uri.getScheme() != null) {
@@ -260,4 +241,39 @@ public enum PackagedProgramUtils {
                         stderr.length() == 0 ? "(none)" : stderr),
                 cause);
     }
+
+    private static URL getOptJar(String jarName) {
+        String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR);
+        final List<Path> optJarPath = new ArrayList<>();
+        try {
+            Files.walkFileTree(
+                    FileSystems.getDefault().getPath(flinkOptPath),
+                    new SimpleFileVisitor<Path>() {
+                        @Override
+                        public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs)
+                                throws IOException {
+                            FileVisitResult result = super.visitFile(file, 
attrs);
+                            if 
(file.getFileName().toString().startsWith(jarName)) {
+                                optJarPath.add(file);
+                            }
+                            return result;
+                        }
+                    });
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Exception encountered during finding the flink-python 
jar. This should not happen.",
+                    e);
+        }
+
+        if (optJarPath.size() != 1) {
+            throw new RuntimeException(
+                    String.format("Found " + optJarPath.size() + " %s jar.", 
jarName));
+        }
+
+        try {
+            return optJarPath.get(0).toUri().toURL();
+        } catch (MalformedURLException e) {
+            throw new RuntimeException("URL is invalid. This should not 
happen.", e);
+        }
+    }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
index 35a8ff605d1..c8477deb041 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.tests.util.flink.ClusterController;
 
 import org.junit.Test;
 
+import java.net.URI;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -69,11 +70,13 @@ public class CreateTableAsITCase extends SqlITCaseBase {
     }
 
     @Override
-    protected void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines)
+    protected void executeSqlStatements(
+            ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
             throws Exception {
         clusterController.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
                         .addJar(SQL_TOOL_BOX_JAR)
+                        .addJars(dependencies.toArray(new URI[0]))
                         .build(),
                 Duration.ofMinutes(2L));
     }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
index 4aa7601c03c..8aae8efcc71 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
@@ -36,6 +36,7 @@ import org.junit.BeforeClass;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.List;
@@ -91,10 +92,12 @@ public abstract class HdfsITCaseBase extends SqlITCaseBase {
     }
 
     @Override
-    protected void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines)
+    protected void executeSqlStatements(
+            ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
             throws Exception {
         clusterController.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                        .addJars(dependencies.toArray(new URI[0]))
                         .setEnvProcessor(
                                 map -> map.put("HADOOP_CLASSPATH", 
getHadoopClassPathContent()))
                         .build(),
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
index b6433e2c76a..2ce332ddb63 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.tests.util.flink.ClusterController;
 
 import org.junit.Test;
 
+import java.net.URI;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -76,11 +77,13 @@ public class PlannerScalaFreeITCase extends SqlITCaseBase {
     }
 
     @Override
-    protected void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines)
+    protected void executeSqlStatements(
+            ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
             throws Exception {
         clusterController.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
                         .addJar(SQL_TOOL_BOX_JAR)
+                        .addJars(dependencies.toArray(new URI[0]))
                         .build(),
                 Duration.ofMinutes(2L));
     }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
index 2f448f6ec74..41d1f55a1b2 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -125,23 +126,25 @@ public abstract class SqlITCaseBase extends TestLogger {
         runAndCheckSQL(
                 sqlPath,
                 Collections.singletonMap(result, resultItems),
-                Collections.singletonMap(result, formatter));
+                Collections.singletonMap(result, formatter),
+                Collections.emptyList());
     }
 
     public void runAndCheckSQL(String sqlPath, Map<Path, List<String>> 
resultItems)
             throws Exception {
-        runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap());
+        runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap(), 
Collections.emptyList());
     }
 
     public void runAndCheckSQL(
             String sqlPath,
             Map<Path, List<String>> resultItems,
-            Map<Path, Function<List<String>, List<String>>> formatters)
+            Map<Path, Function<List<String>, List<String>>> formatters,
+            List<URI> dependencies)
             throws Exception {
         try (ClusterController clusterController = flink.startCluster(1)) {
             List<String> sqlLines = initializeSqlLines(sqlPath);
 
-            executeSqlStatements(clusterController, sqlLines);
+            executeSqlStatements(clusterController, sqlLines, dependencies);
 
             // Wait until all the results flushed to the json file.
             LOG.info("Verify the result.");
@@ -163,7 +166,8 @@ public abstract class SqlITCaseBase extends TestLogger {
     }
 
     protected abstract void executeSqlStatements(
-            ClusterController clusterController, List<String> sqlLines) throws 
Exception;
+            ClusterController clusterController, List<String> sqlLines, 
List<URI> dependencies)
+            throws Exception;
 
     private List<String> initializeSqlLines(String sqlPath) throws IOException 
{
         URL url = SqlITCaseBase.class.getClassLoader().getResource(sqlPath);
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
index f1cceeaf2cf..48dfbb580cf 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -90,6 +91,25 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
                                 raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
+    @Test
+    public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception 
{
+        runAndCheckSQL(
+                "sql_client_remote_jar_e2e.sql",
+                Collections.singletonMap(result, Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]")),
+                Collections.singletonMap(
+                        result,
+                        raw ->
+                                convertToMaterializedResult(
+                                        raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA)),
+                Collections.singletonList(
+                        URI.create(
+                                String.format(
+                                        "hdfs://%s:%s/%s",
+                                        hdfsCluster.getURI().getHost(),
+                                        hdfsCluster.getNameNodePort(),
+                                        hdPath))));
+    }
+
     @Test
     public void testScalarUdfWhenCheckpointEnable() throws Exception {
         runAndCheckSQL(
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/sql_client_remote_jar_e2e.sql
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/sql_client_remote_jar_e2e.sql
new file mode 100644
index 00000000000..4c5735e2fb0
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/sql_client_remote_jar_e2e.sql
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+CREATE TABLE JsonTable (
+    user_name STRING,
+    order_cnt BIGINT
+) WITH (
+    'connector' = 'filesystem',
+    'path' = '$RESULT',
+    'sink.rolling-policy.rollover-interval' = '2s',
+    'sink.rolling-policy.check-interval' = '2s',
+    'format' = 'debezium-json'
+);
+
+create function count_agg as 'org.apache.flink.table.toolbox.CountAggFunction' 
LANGUAGE JAVA;
+
+SET execution.runtime-mode = $MODE;
+SET table.exec.mini-batch.enabled = true;
+SET table.exec.mini-batch.size = 5;
+SET table.exec.mini-batch.allow-latency = 2s;
+
+INSERT INTO JsonTable
+SELECT user_name, count_agg(order_id)
+FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
+GROUP BY user_name;
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 2ecf977db64..f0b43f85f55 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -26,14 +26,12 @@ import org.apache.flink.table.client.cli.CliOptionsParser;
 import org.apache.flink.table.client.gateway.DefaultContextUtils;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.SingleSessionManager;
-import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.gateway.SqlGateway;
 import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 import org.apache.flink.util.NetUtils;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.jline.terminal.Terminal;
 import org.slf4j.Logger;
@@ -42,16 +40,14 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.function.Supplier;
 
 import static 
org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY;
+import static org.apache.flink.table.client.cli.CliUtils.isApplicationMode;
 import static 
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getSqlGatewayOptionPrefix;
 
 /**
@@ -140,7 +136,11 @@ public class SqlClient {
 
         try (CliClient cli = new CliClient(terminalFactory, executor, 
historyFilePath)) {
             if (options.getInitFile() != null) {
-                boolean success = 
cli.executeInitialization(readFromURL(options.getInitFile()));
+                if (isApplicationMode(executor.getSessionConfig())) {
+                    throw new SqlClientException(
+                            "Sql Client doesn't support to run init files when 
deploying script into cluster.");
+                }
+                boolean success = 
cli.executeInitialization(options.getInitFile());
                 if (!success) {
                     System.out.println(
                             String.format(
@@ -158,7 +158,7 @@ public class SqlClient {
             if (!hasSqlFile) {
                 cli.executeInInteractiveMode();
             } else {
-                cli.executeInNonInteractiveMode(readExecutionContent());
+                cli.executeInNonInteractiveMode(options.getSqlFile());
             }
         }
     }
@@ -320,17 +320,4 @@ public class SqlClient {
             System.out.println("done.");
         }
     }
-
-    private String readExecutionContent() {
-        return readFromURL(options.getSqlFile());
-    }
-
-    private String readFromURL(URL file) {
-        try {
-            return IOUtils.toString(file, StandardCharsets.UTF_8);
-        } catch (IOException e) {
-            throw new SqlExecutionException(
-                    String.format("Fail to read content from the %s.", 
file.getPath()), e);
-        }
-    }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 0acc4d5afd5..c88d64909be 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.client.cli;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter;
 import org.apache.flink.table.client.cli.parser.SqlCommandParserImpl;
@@ -26,7 +28,10 @@ import 
org.apache.flink.table.client.cli.parser.SqlMultiLineParser;
 import org.apache.flink.table.client.config.SqlClientOptions;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.util.FileUtils;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.jline.reader.EndOfFileException;
 import org.jline.reader.LineReader;
 import org.jline.reader.LineReaderBuilder;
@@ -47,10 +52,17 @@ import java.io.IOError;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_DEPLOY_SCRIPT;
+import static org.apache.flink.table.client.cli.CliStrings.messageInfo;
+import static org.apache.flink.table.client.cli.CliUtils.isApplicationMode;
+
 /** SQL CLI client. */
 public class CliClient implements AutoCloseable {
 
@@ -125,21 +137,37 @@ public class CliClient implements AutoCloseable {
     }
 
     /** Opens the non-interactive CLI shell. */
-    public void executeInNonInteractiveMode(String content) {
+    public void executeInNonInteractiveMode(URI uri) {
         try {
             terminal = terminalFactory.get();
-            executeFile(content, terminal.output(), 
ExecutionMode.NON_INTERACTIVE_EXECUTION);
+            if (isApplicationMode(executor.getSessionConfig())) {
+                String scheme = StringUtils.lowerCase(uri.getScheme());
+                String clusterId;
+                // local files
+                if (scheme == null || scheme.equals("file")) {
+                    clusterId = executor.deployScript(readFile(uri), null);
+                } else {
+                    clusterId = executor.deployScript(null, uri);
+                }
+                
terminal.writer().println(messageInfo(MESSAGE_DEPLOY_SCRIPT).toAnsi());
+                terminal.writer().println(String.format("Cluster ID: %s\n", 
clusterId));
+                terminal.flush();
+            } else {
+                executeFile(
+                        readFile(uri), terminal.output(), 
ExecutionMode.NON_INTERACTIVE_EXECUTION);
+            }
         } finally {
             closeTerminal();
         }
     }
 
     /** Initialize the Cli Client with the content. */
-    public boolean executeInitialization(String content) {
+    public boolean executeInitialization(URI file) {
         try {
             OutputStream outputStream = new ByteArrayOutputStream(256);
             terminal = TerminalUtils.createDumbTerminal(outputStream);
-            boolean success = executeFile(content, outputStream, 
ExecutionMode.INITIALIZATION);
+            boolean success =
+                    executeFile(readFile(file), outputStream, 
ExecutionMode.INITIALIZATION);
             LOG.info(outputStream.toString());
             return success;
         } finally {
@@ -326,4 +354,39 @@ public class CliClient implements AutoCloseable {
         }
         return lineReader;
     }
+
+    public static String readFile(URI uri) {
+        try {
+            if (uri.getScheme() != null
+                    && (uri.getScheme().equals("http") || 
uri.getScheme().equals("https"))) {
+                return readFromHttp(uri);
+            } else {
+                return readFileUtf8(uri);
+            }
+        } catch (IOException e) {
+            throw new SqlClientException("Failed to read file " + uri, e);
+        }
+    }
+
+    private static String readFromHttp(URI uri) throws IOException {
+        HttpURLConnection conn = (HttpURLConnection) 
uri.toURL().openConnection();
+
+        conn.setRequestMethod("GET");
+
+        try (InputStream inputStream = conn.getInputStream();
+                ByteArrayOutputStream targetFile = new 
ByteArrayOutputStream()) {
+            IOUtils.copy(inputStream, targetFile);
+            return targetFile.toString(StandardCharsets.UTF_8);
+        }
+    }
+
+    private static String readFileUtf8(URI uri) throws IOException {
+        org.apache.flink.core.fs.Path path = new 
org.apache.flink.core.fs.Path(uri.toString());
+        FileSystem fs = path.getFileSystem();
+        try (FSDataInputStream inputStream = fs.open(path)) {
+            return new String(
+                    FileUtils.read(inputStream, (int) 
fs.getFileStatus(path).getLen()),
+                    StandardCharsets.UTF_8);
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
index 3d1aa802abd..1177c9cbaba 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nullable;
 
+import java.net.URI;
 import java.net.URL;
 import java.util.List;
 import java.util.Optional;
@@ -35,16 +36,16 @@ public class CliOptions {
 
     private final boolean isPrintHelp;
     private final String sessionId;
-    private final URL initFile;
-    private final URL sqlFile;
+    private final URI initFile;
+    private final URI sqlFile;
     private final String historyFilePath;
     private final Properties sessionConfig;
 
     private CliOptions(
             boolean isPrintHelp,
             String sessionId,
-            URL initFile,
-            URL sqlFile,
+            URI initFile,
+            URI sqlFile,
             String historyFilePath,
             Properties sessionConfig) {
         this.isPrintHelp = isPrintHelp;
@@ -63,11 +64,11 @@ public class CliOptions {
         return sessionId;
     }
 
-    public @Nullable URL getInitFile() {
+    public @Nullable URI getInitFile() {
         return initFile;
     }
 
-    public @Nullable URL getSqlFile() {
+    public @Nullable URI getSqlFile() {
         return sqlFile;
     }
 
@@ -82,19 +83,19 @@ public class CliOptions {
     /** Command option lines to configure SQL Client in the embedded mode. */
     public static class EmbeddedCliOptions extends CliOptions {
 
-        private final List<URL> jars;
-        private final List<URL> libraryDirs;
+        private final List<URI> jars;
+        private final List<URI> libraryDirs;
 
         private final Configuration pythonConfiguration;
 
         public EmbeddedCliOptions(
                 boolean isPrintHelp,
                 String sessionId,
-                URL initFile,
-                URL sqlFile,
+                URI initFile,
+                URI sqlFile,
                 String historyFilePath,
-                List<URL> jars,
-                List<URL> libraryDirs,
+                List<URI> jars,
+                List<URI> libraryDirs,
                 Configuration pythonConfiguration,
                 Properties sessionConfig) {
             super(isPrintHelp, sessionId, initFile, sqlFile, historyFilePath, 
sessionConfig);
@@ -103,11 +104,11 @@ public class CliOptions {
             this.pythonConfiguration = pythonConfiguration;
         }
 
-        public List<URL> getJars() {
+        public List<URI> getJars() {
             return jars;
         }
 
-        public List<URL> getLibraryDirs() {
+        public List<URI> getLibraryDirs() {
             return libraryDirs;
         }
 
@@ -124,8 +125,8 @@ public class CliOptions {
         GatewayCliOptions(
                 boolean isPrintHelp,
                 String sessionId,
-                URL initFile,
-                URL sqlFile,
+                URI initFile,
+                URI sqlFile,
                 String historyFilePath,
                 @Nullable URL gatewayAddress,
                 Properties sessionConfig) {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index debd88df718..59fe727e195 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.client.cli;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.util.NetUtils;
 
@@ -32,11 +31,13 @@ import org.apache.commons.cli.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
+import javax.annotation.Nullable;
+
 import java.io.PrintWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
+import java.net.URI;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
@@ -249,11 +250,11 @@ public class CliOptionsParser {
             return new CliOptions.EmbeddedCliOptions(
                     line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
                     checkSessionId(line),
-                    checkUrl(line, CliOptionsParser.OPTION_INIT_FILE),
-                    checkUrl(line, CliOptionsParser.OPTION_FILE),
+                    parseURI(line, CliOptionsParser.OPTION_INIT_FILE),
+                    parseURI(line, CliOptionsParser.OPTION_FILE),
                     
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
-                    checkUrls(line, CliOptionsParser.OPTION_JAR),
-                    checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+                    parseURIs(line, CliOptionsParser.OPTION_JAR),
+                    parseURIs(line, CliOptionsParser.OPTION_LIBRARY),
                     getPythonConfiguration(line),
                     line.getOptionProperties(OPTION_SESSION_CONFIG.getOpt()));
         } catch (ParseException e) {
@@ -268,8 +269,8 @@ public class CliOptionsParser {
             return new CliOptions.GatewayCliOptions(
                     line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
                     checkSessionId(line),
-                    checkUrl(line, CliOptionsParser.OPTION_INIT_FILE),
-                    checkUrl(line, CliOptionsParser.OPTION_FILE),
+                    parseURI(line, CliOptionsParser.OPTION_INIT_FILE),
+                    parseURI(line, CliOptionsParser.OPTION_FILE),
                     
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
                     
line.hasOption(CliOptionsParser.OPTION_ENDPOINT_ADDRESS.getOpt())
                             ? parseGatewayAddress(
@@ -308,32 +309,30 @@ public class CliOptionsParser {
 
     // 
--------------------------------------------------------------------------------------------
 
-    private static URL checkUrl(CommandLine line, Option option) {
-        final List<URL> urls = checkUrls(line, option);
-        if (urls != null && !urls.isEmpty()) {
-            return urls.get(0);
+    private static @Nullable URI parseURI(CommandLine line, Option option) {
+        List<URI> uris = parseURIs(line, option);
+        if (uris == null || uris.isEmpty()) {
+            return null;
+        } else {
+            return uris.get(0);
         }
-        return null;
     }
 
-    private static List<URL> checkUrls(CommandLine line, Option option) {
+    private static @Nullable List<URI> parseURIs(CommandLine line, Option 
option) {
         if (line.hasOption(option.getOpt())) {
-            final String[] urls = line.getOptionValues(option.getOpt());
-            return Arrays.stream(urls)
+            final String[] uris = line.getOptionValues(option.getOpt());
+            return Arrays.stream(uris)
                     .distinct()
                     .map(
-                            (url) -> {
-                                checkFilePath(url);
+                            uri -> {
                                 try {
-                                    return Path.fromLocalFile(new 
File(url).getAbsoluteFile())
-                                            .toUri()
-                                            .toURL();
+                                    return URI.create(uri);
                                 } catch (Exception e) {
                                     throw new SqlClientException(
-                                            "Invalid path for option '"
+                                            "Invalid uri for option '"
                                                     + option.getLongOpt()
                                                     + "': "
-                                                    + url,
+                                                    + uri,
                                             e);
                                 }
                             })
@@ -342,14 +341,6 @@ public class CliOptionsParser {
         return null;
     }
 
-    public static void checkFilePath(String filePath) {
-        Path path = new Path(filePath);
-        String scheme = path.toUri().getScheme();
-        if (scheme != null && !scheme.equals("file")) {
-            throw new SqlClientException("SQL Client only supports to load 
files in local.");
-        }
-    }
-
     private static String checkSessionId(CommandLine line) {
         final String sessionId = 
line.getOptionValue(CliOptionsParser.OPTION_SESSION.getOpt());
         if (sessionId == null) {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index 430d40145de..1287ac4d7ca 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -213,6 +213,8 @@ public final class CliStrings {
 
     public static final String MESSAGE_EXECUTE_STATEMENT = "Execute statement 
succeeded.";
 
+    public static final String MESSAGE_DEPLOY_SCRIPT = "Deploy script in 
application mode: ";
+
     // 
--------------------------------------------------------------------------------------------
 
     public static final String RESULT_TITLE = "SQL Query Result";
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
index 0f068762a06..4b9e9934ea0 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.client.cli;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.types.DataType;
 
 import org.jline.utils.AttributedString;
@@ -29,7 +29,6 @@ import org.jline.utils.AttributedStyle;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -121,11 +120,8 @@ public final class CliUtils {
         }
     }
 
-    /** Get time zone from the given session config. */
-    public static ZoneId getSessionTimeZone(ReadableConfig sessionConfig) {
-        final String zone = 
sessionConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
-        return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
-                ? ZoneId.systemDefault()
-                : ZoneId.of(zone);
+    public static boolean isApplicationMode(ReadableConfig config) {
+        final String executionTarget = 
config.getOptional(DeploymentOptions.TARGET).orElse("");
+        return executionTarget.trim().endsWith("application");
     }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
index 5f716d99ecb..5ad4d0bdceb 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -40,13 +41,13 @@ public class DefaultContextUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultContextUtils.class);
 
     public static DefaultContext 
buildDefaultContext(CliOptions.EmbeddedCliOptions options) {
-        final List<URL> jars;
+        final List<URI> jars;
         if (options.getJars() != null) {
             jars = options.getJars();
         } else {
             jars = Collections.emptyList();
         }
-        final List<URL> libDirs;
+        final List<URI> libDirs;
         if (options.getLibraryDirs() != null) {
             libDirs = options.getLibraryDirs();
         } else {
@@ -66,18 +67,19 @@ public class DefaultContextUtils {
 
     // 
--------------------------------------------------------------------------------------------
 
-    private static List<URL> discoverDependencies(List<URL> jars, List<URL> 
libraries) {
-        final List<URL> dependencies = new ArrayList<>();
+    private static List<URI> discoverDependencies(List<URI> jars, List<URI> 
libraries) {
+        final List<URI> dependencies = new ArrayList<>();
         try {
             // find jar files
-            for (URL url : jars) {
-                JarUtils.checkJarFile(url);
-                dependencies.add(url);
+            for (URI uri : jars) {
+                // delay the file check until ResourceManager is created
+                // ResourceManager supports to download files from external 
system.
+                dependencies.add(uri);
             }
 
             // find jar files in library directories
-            for (URL libUrl : libraries) {
-                final File dir = new File(libUrl.toURI());
+            for (URI libURI : libraries) {
+                final File dir = new File(libURI);
                 if (!dir.isDirectory()) {
                     throw new SqlClientException("Directory expected: " + dir);
                 } else if (!dir.canRead()) {
@@ -92,7 +94,7 @@ public class DefaultContextUtils {
                     if (f.isFile() && 
f.getAbsolutePath().toLowerCase().endsWith(".jar")) {
                         final URL url = f.toURI().toURL();
                         JarUtils.checkJarFile(url);
-                        dependencies.add(url);
+                        dependencies.add(f.toURI());
                     }
                 }
             }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 3be128e0708..de6a479f084 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -22,8 +22,11 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
@@ -87,6 +90,15 @@ public interface Executor extends Closeable {
      */
     List<String> completeStatement(String statement, int position);
 
+    /**
+     * Deploy script in application mode.
+     *
+     * @param script content to run in application mode
+     * @param uri uri to the script
+     * @return the cluster id
+     */
+    String deployScript(@Nullable String script, @Nullable URI uri);
+
     /** Close the {@link Executor} and process all exceptions. */
     void close();
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 88dd22f432b..21ba32454d4 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.gateway.SqlGateway;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import 
org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders;
 import 
org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeaders;
 import 
org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
 import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
@@ -54,6 +55,7 @@ import 
org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHea
 import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
 import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
 import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
+import 
org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody;
 import 
org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
 import 
org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody;
 import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
@@ -84,6 +86,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
@@ -331,6 +334,19 @@ public class ExecutorImpl implements Executor {
                 .getCandidates();
     }
 
+    @Override
+    public String deployScript(@Nullable String script, @Nullable URI uri) {
+        return getResponse(
+                        sendRequest(
+                                DeployScriptHeaders.getInstance(),
+                                new SessionMessageParameters(sessionHandle),
+                                new DeployScriptRequestBody(
+                                        script,
+                                        uri == null ? null : uri.toString(),
+                                        Collections.emptyMap())))
+                .getClusterID();
+    }
+
     @Override
     public void close() {
         if (!registry.isClosed()) {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
index 9c7e7dee0bb..99dd4e72ed2 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.client.gateway;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.resource.ClientResourceManager;
@@ -36,13 +38,24 @@ import 
org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.session.Session;
 import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.MutableURLClassLoader;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.client.cli.ArtifactFetchOptions.ARTIFACT_LIST;
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.client.cli.CliUtils.isApplicationMode;
 
 /**
  * A {@link SessionManager} only has one session at most. It uses the less 
resources and also
@@ -53,6 +66,9 @@ import java.util.concurrent.Executors;
  */
 public class SingleSessionManager implements SessionManager {
 
+    private static final ConfigOption<List<String>> YARN_SHIP_FILES =
+            key("yarn.ship-files").stringType().asList().noDefaultValue();
+
     private final DefaultContext defaultContext;
     private final ExecutorService operationExecutorService;
 
@@ -96,6 +112,7 @@ public class SingleSessionManager implements SessionManager {
                                 sessionHandle,
                                 environment,
                                 operationExecutorService));
+
         session.open();
         return session;
     }
@@ -136,23 +153,56 @@ public class SingleSessionManager implements 
SessionManager {
                 ExecutorService operationExecutorService) {
             Configuration configuration =
                     initializeConfiguration(defaultContext, environment, 
sessionId);
+            List<URI> dependencies;
+            // rewrite the dependencies
+            if (isApplicationMode(configuration)) {
+                dependencies = Collections.emptyList();
+                if (!defaultContext.getDependencies().isEmpty()) {
+                    String target = 
configuration.getOptional(DeploymentOptions.TARGET).orElse("");
+                    if (target.equals("yarn-application")) {
+                        configuration.set(
+                                YARN_SHIP_FILES,
+                                defaultContext.getDependencies().stream()
+                                        .map(URI::toString)
+                                        .collect(Collectors.toList()));
+                    } else if (target.equals("kubernetes-application")) {
+                        configuration.set(
+                                ARTIFACT_LIST,
+                                defaultContext.getDependencies().stream()
+                                        .map(URI::toString)
+                                        .collect(Collectors.toList()));
+                    } else {
+                        throw new SqlGatewayException("Unknown deployment 
target: " + target);
+                    }
+                }
+            } else {
+                dependencies = defaultContext.getDependencies();
+            }
             final MutableURLClassLoader userClassLoader =
                     new ClientWrapperClassLoader(
                             ClientClassloaderUtil.buildUserClassLoader(
-                                    defaultContext.getDependencies(),
+                                    Collections.emptyList(),
                                     SessionContext.class.getClassLoader(),
                                     new Configuration(configuration)),
                             configuration);
             ClientResourceManager resourceManager =
                     new ClientResourceManager(configuration, userClassLoader);
-            return new EmbeddedSessionContext(
-                    defaultContext,
-                    sessionId,
-                    environment.getSessionEndpointVersion(),
-                    configuration,
-                    userClassLoader,
-                    initializeSessionState(environment, configuration, 
resourceManager),
-                    new OperationManager(operationExecutorService));
+            try {
+                resourceManager.registerJarResources(
+                        dependencies.stream()
+                                .map(uri -> new ResourceUri(ResourceType.JAR, 
uri.toString()))
+                                .collect(Collectors.toList()));
+                return new EmbeddedSessionContext(
+                        defaultContext,
+                        sessionId,
+                        environment.getSessionEndpointVersion(),
+                        configuration,
+                        userClassLoader,
+                        initializeSessionState(environment, configuration, 
resourceManager),
+                        new OperationManager(operationExecutorService));
+            } catch (IOException e) {
+                throw new SqlGatewayException("Failed to open the session.", 
e);
+            }
         }
 
         @Override
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
index 428c76ea10a..d8f270a8dbe 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.gateway.rest.DeployScriptITCase;
 import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.util.FileUtils;
@@ -27,10 +28,12 @@ import org.apache.flink.util.Preconditions;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
@@ -39,7 +42,6 @@ import java.util.List;
 import static org.apache.flink.configuration.DeploymentOptions.TARGET;
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link SqlClient}. */
 class SqlClientTest extends SqlClientTestBase {
@@ -250,14 +252,6 @@ class SqlClientTest extends SqlClientTestBase {
                                 + "> END;");
     }
 
-    @Test
-    void testExecuteSqlWithHDFSFile() {
-        String[] args = new String[] {"-f", "hdfs://path/to/file/test.sql"};
-        assertThatThrownBy(() -> runSqlClient(args))
-                .isInstanceOf(SqlClientException.class)
-                .hasMessage("SQL Client only supports to load files in 
local.");
-    }
-
     @Test
     public void testPrintEmbeddedModeHelp() throws Exception {
         runTestCliHelp(new String[] {"embedded", "--help"}, 
"cli/embedded-mode-help.out");
@@ -273,6 +267,17 @@ class SqlClientTest extends SqlClientTestBase {
         runTestCliHelp(new String[] {"--help"}, "cli/all-mode-help.out");
     }
 
+    @Test
+    public void testDeployScript(@TempDir Path home) throws Exception {
+        DeployScriptITCase.TestApplicationClusterClientFactory.id = 
"test-application";
+        Path script = home.resolve("script.sql");
+        assertThat(script.toFile().createNewFile()).isTrue();
+        String[] args = {"-f", script.toString(), 
"-Dexecution.target=test-application"};
+        assertThat(runSqlClient(args))
+                .contains("[INFO] Deploy script in application mode:")
+                .contains("Cluster ID: test");
+    }
+
     private void runTestCliHelp(String[] args, String expected) throws 
Exception {
         String actual =
                 new String(
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index f2920c21d18..938d7d24d29 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.client.cli;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -41,6 +42,7 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.util.CloseableIterator;
 
+import org.apache.commons.io.FileUtils;
 import org.jline.reader.Candidate;
 import org.jline.reader.LineReader;
 import org.jline.reader.LineReaderBuilder;
@@ -49,6 +51,9 @@ import org.jline.reader.Parser;
 import org.jline.terminal.Terminal;
 import org.jline.terminal.impl.DumbTerminal;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -56,6 +61,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -80,6 +87,8 @@ class CliClientTest {
     private static final String SQL_WITHOUT_COMPLETER = "SELECT pos FROM 
source_table;";
     private static final String SQL_WITH_COMPLETER = "SELECT POSITION  FROM 
source_table;";
 
+    private static @TempDir Path home;
+
     @Test
     void testUpdateSubmission() throws Exception {
         verifyUpdateSubmission(INSERT_INTO_STATEMENT, false, false);
@@ -257,14 +266,18 @@ class CliClientTest {
         Path historyFilePath = historyTempFile();
 
         OutputStream outputStream = new ByteArrayOutputStream(256);
-
+        File script =
+                home.resolve(String.format("script_%s.sql", 
System.currentTimeMillis())).toFile();
+        assertThat(script.createNewFile()).isTrue();
         try (CliClient client =
                 new CliClient(
                         () -> TerminalUtils.createDumbTerminal(outputStream),
                         mockExecutor,
                         historyFilePath,
                         null)) {
-            Thread thread = new Thread(() -> 
client.executeInNonInteractiveMode(content));
+            FileUtils.writeStringToFile(script, content, 
StandardCharsets.UTF_8);
+            Thread thread = new Thread(() -> 
client.executeInNonInteractiveMode(script.toURI()));
+
             thread.start();
 
             while (!mockExecutor.isAwait) {
@@ -316,6 +329,29 @@ class CliClientTest {
         }
     }
 
+    @Test
+    void testDeployScript() throws Exception {
+        final MockExecutor mockExecutor = new MockExecutor(new 
SqlParserHelper(), true);
+        Path historyFilePath = historyTempFile();
+
+        File script =
+                home.resolve(String.format("script_%s.sql", 
System.currentTimeMillis())).toFile();
+        mockExecutor.configuration.set(DeploymentOptions.TARGET, 
"kubernetes-application");
+        assertThat(script.createNewFile()).isTrue();
+        try (OutputStream outputStream = new ByteArrayOutputStream(256);
+                CliClient client =
+                        new CliClient(
+                                () -> 
TerminalUtils.createDumbTerminal(outputStream),
+                                mockExecutor,
+                                historyFilePath,
+                                null)) {
+            client.executeInNonInteractiveMode(script.toURI());
+            assertThat(outputStream.toString())
+                    .contains("[INFO] Deploy script in application mode: ")
+                    .contains("Cluster ID: test-application-cluster");
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private void verifyUpdateSubmission(
@@ -368,13 +404,18 @@ class CliClientTest {
 
     private String executeSqlFromContent(MockExecutor executor, String 
content) throws IOException {
         OutputStream outputStream = new ByteArrayOutputStream(256);
+        File script = home.resolve("script.sql").toFile();
+        assertThat(script.createNewFile()).isTrue();
         try (CliClient client =
                 new CliClient(
                         () -> TerminalUtils.createDumbTerminal(outputStream),
                         executor,
                         historyTempFile(),
                         null)) {
-            client.executeInNonInteractiveMode(content);
+            FileUtils.writeStringToFile(script, content, 
StandardCharsets.UTF_8);
+            client.executeInNonInteractiveMode(script.toURI());
+        } finally {
+            script.delete();
         }
         return outputStream.toString();
     }
@@ -465,6 +506,11 @@ class CliClientTest {
             return 
Arrays.asList(helper.getSqlParser().getCompletionHints(statement, position));
         }
 
+        @Override
+        public String deployScript(@Nullable String script, @Nullable URI uri) 
{
+            return "test-application-cluster";
+        }
+
         @Override
         public void close() {
             // do nothing
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
index a2bd7836bad..ea304bc97bd 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
@@ -166,7 +166,7 @@ class ExecutorImplITCase {
     private static RestClusterClient<?> clusterClient;
 
     // a generated UDF jar used for testing classloading of dependencies
-    private static URL udfDependency;
+    private static URI udfDependency;
 
     private final ThreadFactory threadFactory =
             new ExecutorThreadFactory("Executor Test Pool", 
IgnoreExceptionHandler.INSTANCE);
@@ -181,7 +181,7 @@ class ExecutorImplITCase {
                         "test-classloader-udf.jar",
                         GENERATED_LOWER_UDF_CLASS,
                         String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS));
-        udfDependency = udfJar.toURI().toURL();
+        udfDependency = udfJar.toURI();
     }
 
     private static Configuration getConfig() {
@@ -662,7 +662,7 @@ class ExecutorImplITCase {
     }
 
     private Executor createRestServiceExecutor(
-            List<URL> dependencies, Configuration configuration) {
+            List<URI> dependencies, Configuration configuration) {
         return createExecutor(
                 dependencies,
                 configuration,
@@ -681,7 +681,7 @@ class ExecutorImplITCase {
     }
 
     private Executor createExecutor(
-            List<URL> dependencies, Configuration configuration, 
InetSocketAddress address) {
+            List<URI> dependencies, Configuration configuration, 
InetSocketAddress address) {
         configuration.addAll(clusterClient.getFlinkConfiguration());
         DefaultContext defaultContext = new DefaultContext(configuration, 
dependencies);
         // frequently trigger heartbeat
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
index 78eccc7337d..1b91dce16e5 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -37,7 +37,8 @@ import org.apache.commons.cli.Options;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URL;
+import java.net.URI;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -48,13 +49,13 @@ public class DefaultContext {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultContext.class);
 
     private final Configuration flinkConfig;
-    private final List<URL> dependencies;
+    private final List<URI> dependencies;
 
-    public DefaultContext(Map<String, String> flinkConfig, List<URL> 
dependencies) {
+    public DefaultContext(Map<String, String> flinkConfig, List<URI> 
dependencies) {
         this(Configuration.fromMap(flinkConfig), dependencies);
     }
 
-    public DefaultContext(Configuration flinkConfig, List<URL> dependencies) {
+    public DefaultContext(Configuration flinkConfig, List<URI> dependencies) {
         this.flinkConfig = flinkConfig;
         this.dependencies = dependencies;
     }
@@ -63,7 +64,7 @@ public class DefaultContext {
         return flinkConfig;
     }
 
-    public List<URL> getDependencies() {
+    public List<URI> getDependencies() {
         return dependencies;
     }
 
@@ -80,8 +81,7 @@ public class DefaultContext {
     private static Configuration createExecutionConfig(
             CommandLine commandLine,
             Options commandLineOptions,
-            List<CustomCommandLine> availableCommandLines,
-            List<URL> dependencies)
+            List<CustomCommandLine> availableCommandLines)
             throws FlinkException {
         LOG.debug("Available commandline options: {}", commandLineOptions);
         List<String> options =
@@ -105,7 +105,8 @@ public class DefaultContext {
         try {
             final ProgramOptions programOptions = 
ProgramOptions.create(commandLine);
             final ExecutionConfigAccessor executionConfigAccessor =
-                    ExecutionConfigAccessor.fromProgramOptions(programOptions, 
dependencies);
+                    ExecutionConfigAccessor.fromProgramOptions(
+                            programOptions, Collections.emptyList());
             executionConfigAccessor.applyToConfiguration(executionConfig);
         } catch (CliArgsException e) {
             throw new SqlGatewayException("Invalid deployment run options.", 
e);
@@ -136,7 +137,7 @@ public class DefaultContext {
      * @param discoverExecutionConfig flag whether to load the execution 
configuration
      */
     public static DefaultContext load(
-            Configuration dynamicConfig, List<URL> dependencies, boolean 
discoverExecutionConfig) {
+            Configuration dynamicConfig, List<URI> dependencies, boolean 
discoverExecutionConfig) {
         // 1. find the configuration directory
         String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
 
@@ -161,10 +162,7 @@ public class DefaultContext {
                         CliFrontendParser.parse(commandLineOptions, new 
String[] {}, true);
                 configuration.addAll(
                         createExecutionConfig(
-                                deploymentCommandLine,
-                                commandLineOptions,
-                                commandLines,
-                                dependencies));
+                                deploymentCommandLine, commandLineOptions, 
commandLines));
             } catch (Exception e) {
                 throw new SqlGatewayException(
                         "Could not load available CLI with Environment 
Deployment entry.", e);
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
index 8ff4af291a8..5d198b9a6e6 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
@@ -30,6 +30,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
+import javax.annotation.Nullable;
+
+import java.net.URI;
 import java.nio.file.Path;
 import java.sql.ResultSet;
 import java.sql.SQLFeatureNotSupportedException;
@@ -251,6 +254,11 @@ public class FlinkStatementTest extends 
FlinkJdbcDriverTestBase {
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public String deployScript(@Nullable String script, @Nullable URI uri) 
{
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public void close() {}
     }
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
index 0d0700d9994..19dc6f3aae2 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.util;
 
+import java.net.URI;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
@@ -78,11 +79,23 @@ public class SQLJobSubmission {
             return this;
         }
 
+        public SQLJobSubmissionBuilder addJar(URI jarFile) {
+            this.jars.add(jarFile.toString());
+            return this;
+        }
+
         public SQLJobSubmissionBuilder addJar(Path jarFile) {
             this.jars.add(jarFile.toAbsolutePath().toString());
             return this;
         }
 
+        public SQLJobSubmissionBuilder addJars(URI... jarFiles) {
+            for (URI jarFile : jarFiles) {
+                addJar(jarFile);
+            }
+            return this;
+        }
+
         public SQLJobSubmissionBuilder addJars(Path... jarFiles) {
             for (Path jarFile : jarFiles) {
                 addJar(jarFile);
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/SqlYARNApplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/SqlYARNApplicationITCase.java
new file mode 100644
index 00000000000..282e297c35f
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/SqlYARNApplicationITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RpcOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests to deploy script into mini yarn cluster. */
+public class SqlYARNApplicationITCase extends YarnTestBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SqlYARNApplicationITCase.class);
+
+    private static final Duration yarnAppTerminateTimeout = 
Duration.ofSeconds(30);
+    private static final int sleepIntervalInMS = 100;
+    private static @TempDir Path workDir;
+    private static File script;
+
+    @BeforeAll
+    static void setup() throws Exception {
+        YARN_CONFIGURATION.set(
+                YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-sql-yarn-test-application");
+        startYARNWithConfig(YARN_CONFIGURATION, true);
+        script = workDir.resolve("script.sql").toFile();
+        assertThat(script.createNewFile()).isTrue();
+        FileUtils.writeFileUtf8(
+                script,
+                "CREATE TEMPORARY TABLE sink(\n"
+                        + "  a INT\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'blackhole'\n"
+                        + ");\n"
+                        + "INSERT INTO sink VALUES (1), (2), (3);");
+    }
+
+    @Test
+    void testDeployScriptViaSqlClient() throws Exception {
+        runTest(this::runSqlClient);
+    }
+
+    private void runSqlClient() throws Exception {
+        Path path = 
flinkLibFolder.getParentFile().toPath().resolve("bin").resolve("sql-client.sh");
+        if (!path.toFile().exists()) {
+            throw new RuntimeException();
+        }
+
+        List<String> parameters = new ArrayList<>();
+        // command line parameters: sql-client.sh -Dkey=value -f 
<path-to-script>
+        parameters.add(path.toString());
+        parameters.add(
+                
getSqlClientParameter(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), "768MB"));
+        
parameters.add(getSqlClientParameter(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
 "1g"));
+        
parameters.add(getSqlClientParameter(RpcOptions.ASK_TIMEOUT_DURATION.key(), 
"30s"));
+        parameters.add(
+                getSqlClientParameter(
+                        DeploymentOptions.TARGET.key(),
+                        YarnDeploymentTarget.APPLICATION.getName()));
+        parameters.add(
+                getSqlClientParameter(
+                        CLASSPATH_INCLUDE_USER_JAR.key(),
+                        YarnConfigOptions.UserJarInclusion.LAST.name()));
+        parameters.add("-f");
+        parameters.add(script.getAbsolutePath());
+
+        ProcessBuilder builder = new ProcessBuilder(parameters);
+        // prepare environment
+        builder.environment().put("HADOOP_CLASSPATH", getYarnClasspath());
+        builder.environment().putAll(env);
+        Process process = builder.start();
+
+        // start to deploy script
+        StringBuilder output = new StringBuilder();
+        consumeOutput(process.getErrorStream(), line -> 
output.append(line).append("\n"));
+        consumeOutput(process.getInputStream(), line -> 
output.append(line).append("\n"));
+
+        process.waitFor(120, TimeUnit.SECONDS);
+
+        // validate results
+        assertThat(output).contains("Deploy script in application mode:");
+        assertThat(output).contains("Cluster ID:");
+
+        Pattern pattern = Pattern.compile("Cluster ID: 
(application_\\w+_\\w+)");
+        Matcher matcher = pattern.matcher(output.toString());
+        assertThat(matcher.find()).isTrue();
+        ApplicationId applicationId = 
ApplicationId.fromString(matcher.group(1));
+
+        try (final YarnClusterDescriptor yarnClusterDescriptor =
+                createYarnClusterDescriptor(
+                        Configuration.fromMap(
+                                Collections.singletonMap(
+                                        DeploymentOptions.TARGET.key(),
+                                        
YarnDeploymentTarget.APPLICATION.getName())))) {
+            waitApplicationFinishedElseKillIt(
+                    applicationId,
+                    yarnAppTerminateTimeout,
+                    yarnClusterDescriptor,
+                    sleepIntervalInMS);
+        }
+    }
+
+    private String getSqlClientParameter(String key, String value) {
+        return String.format("-D%s=%s", key, value);
+    }
+
+    private static void consumeOutput(
+            final InputStream stream, final Consumer<String> streamConsumer) {
+        new Thread(
+                        () -> {
+                            try (BufferedReader bufferedReader =
+                                    new BufferedReader(
+                                            new InputStreamReader(
+                                                    stream, 
StandardCharsets.UTF_8))) {
+                                String line;
+                                while ((line = bufferedReader.readLine()) != 
null) {
+                                    streamConsumer.accept(line);
+                                }
+                            } catch (IOException e) {
+                                LOG.error("Failure while processing process 
stdout/stderr.", e);
+                            }
+                        })
+                .start();
+    }
+}
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index e8fccb54997..45b5023abcd 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -207,6 +207,7 @@ public abstract class YarnTestBase {
 
     protected static File yarnSiteXML = null;
     protected static File hdfsSiteXML = null;
+    protected static Map<String, String> env;
 
     private YarnClient yarnClient = null;
 
@@ -244,7 +245,7 @@ public abstract class YarnTestBase {
      *
      * @return a classpath suitable for running all YARN-launched JVMs
      */
-    private static String getYarnClasspath() {
+    protected static String getYarnClasspath() {
         final String start = "../flink-yarn-tests";
         try {
             File classPathFile =
@@ -819,7 +820,7 @@ public abstract class YarnTestBase {
                 yarnCluster.start();
             }
 
-            Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+            env = new HashMap<>(System.getenv());
 
             File flinkConfDirPath =
                     TestUtils.findFile(
@@ -852,7 +853,7 @@ public abstract class YarnTestBase {
 
             assertThat(configDir).isNotNull();
 
-            map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
+            env.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
 
             File targetTestClassesFolder = new File("target/test-classes");
             writeYarnSiteConfigXML(conf, targetTestClassesFolder);
@@ -862,12 +863,12 @@ public abstract class YarnTestBase {
                 setMiniDFSCluster(targetTestClassesFolder);
             }
 
-            map.put(
+            env.put(
                     "IN_TESTS",
                     "yes we are in tests"); // see YarnClusterDescriptor() for 
more infos
-            map.put("YARN_CONF_DIR", 
targetTestClassesFolder.getAbsolutePath());
-            map.put("MAX_LOG_FILE_NUMBER", "10");
-            CommonTestUtils.setEnv(map);
+            env.put("YARN_CONF_DIR", 
targetTestClassesFolder.getAbsolutePath());
+            env.put("MAX_LOG_FILE_NUMBER", "10");
+            CommonTestUtils.setEnv(env);
 
             
assertThat(yarnCluster.getServiceState()).isEqualTo(Service.STATE.STARTED);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index ee6207087bd..c4c7e52ab7c 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -28,12 +28,14 @@ import 
org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
 import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -78,6 +80,12 @@ public class YarnClusterClientFactory
         final YarnConfiguration yarnConfiguration =
                 Utils.getYarnAndHadoopConfiguration(configuration);
 
+        if (System.getenv().get("IN_TESTS") != null) {
+            File f = new File(System.getenv("YARN_CONF_DIR"), 
Utils.YARN_SITE_FILE_NAME);
+            Path yarnSitePath = new Path(f.getAbsolutePath());
+            yarnConfiguration.addResource(yarnSitePath);
+        }
+
         yarnClient.init(yarnConfiguration);
         yarnClient.start();
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 8d2e88e253a..42a6e3e8337 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -988,14 +988,23 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         }
 
         // only for application mode
-        // Python jar file only needs to be shipped and should not be added to 
classpath.
-        if 
(YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)
-                && 
PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
-            fileUploader.registerMultipleLocalResources(
-                    Collections.singletonList(
-                            new 
Path(PackagedProgramUtils.getPythonJar().toURI())),
-                    ConfigConstants.DEFAULT_FLINK_OPT_DIR,
-                    LocalResourceType.FILE);
+        if 
(YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint))
 {
+            // Python jar/Sql Gateway jar only need to be shipped and should 
not be added to
+            // classpath.
+            if 
(PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
+                fileUploader.registerMultipleLocalResources(
+                        Collections.singletonList(
+                                new 
Path(PackagedProgramUtils.getPythonJar().toURI())),
+                        ConfigConstants.DEFAULT_FLINK_OPT_DIR,
+                        LocalResourceType.FILE);
+            } else if (PackagedProgramUtils.isSqlApplication(
+                    configuration.get(APPLICATION_MAIN_CLASS))) {
+                fileUploader.registerMultipleLocalResources(
+                        Collections.singletonList(
+                                new 
Path(PackagedProgramUtils.getSqlGatewayJar().toURI())),
+                        ConfigConstants.DEFAULT_FLINK_OPT_DIR,
+                        LocalResourceType.FILE);
+            }
         }
 
         // Upload and register user jars

Reply via email to