This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f594d71  [FLINK-12828][sql-client] Support -f option with a sql script 
as input
f594d71 is described below

commit f594d71ac63accb21e9cf854c3525fe2ca913488
Author: Shengkai <1059623...@qq.com>
AuthorDate: Sat Mar 27 14:39:48 2021 +0800

    [FLINK-12828][sql-client] Support -f option with a sql script as input
    
    This closes #15366
---
 .../org/apache/flink/table/client/SqlClient.java   |  49 +++-
 .../apache/flink/table/client/cli/CliClient.java   | 248 ++++++++++-----------
 .../apache/flink/table/client/cli/CliOptions.java  |   9 +
 .../flink/table/client/cli/CliOptionsParser.java   |  29 ++-
 .../table/client/cli/CliStatementSplitter.java     |  67 ++++++
 .../apache/flink/table/client/cli/CliStrings.java  |   4 +-
 .../flink/table/client/cli/SqlMultiLineParser.java |   3 +-
 .../apache/flink/table/client/SqlClientTest.java   |  35 +++
 .../flink/table/client/cli/CliClientITCase.java    |   3 +
 .../flink/table/client/cli/CliClientTest.java      | 213 ++++++++++++++----
 .../table/client/cli/CliStatementSplitterTest.java |  72 ++++++
 .../table/client/cli/TerminalStreamsResource.java  |   5 +
 .../src/test/resources/sql-client-help-command.out |  24 ++
 .../src/test/resources/sql-client-help.out         |  26 ++-
 14 files changed, 582 insertions(+), 205 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 8e2bcbc..79b2a71 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -22,14 +22,19 @@ import org.apache.flink.table.client.cli.CliClient;
 import org.apache.flink.table.client.cli.CliOptions;
 import org.apache.flink.table.client.cli.CliOptionsParser;
 import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.context.DefaultContext;
 import org.apache.flink.table.client.gateway.local.LocalContextUtils;
 import org.apache.flink.table.client.gateway.local.LocalExecutor;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -106,18 +111,25 @@ public class SqlClient {
                             SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : 
".flink-sql-history");
         }
 
+        boolean hasSqlFile = options.getSqlFile() != null;
+        boolean hasUpdateStatement = options.getUpdateStatement() != null;
+        if (hasSqlFile && hasUpdateStatement) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Please use either option %s or %s. The option %s 
is deprecated and it's suggested to use %s instead.",
+                            CliOptionsParser.OPTION_FILE,
+                            CliOptionsParser.OPTION_UPDATE,
+                            CliOptionsParser.OPTION_UPDATE.getOpt(),
+                            CliOptionsParser.OPTION_FILE.getOpt()));
+        }
+
+        boolean isInteractiveMode = !hasSqlFile && !hasUpdateStatement;
+
         try (CliClient cli = new CliClient(sessionId, executor, 
historyFilePath)) {
-            // interactive CLI mode
-            if (options.getUpdateStatement() == null) {
+            if (isInteractiveMode) {
                 cli.open();
-            }
-            // execute single update statement
-            else {
-                final boolean success = 
cli.submitUpdate(options.getUpdateStatement());
-                if (!success) {
-                    throw new SqlClientException(
-                            "Could not submit given SQL update statement to 
cluster.");
-                }
+            } else {
+                cli.executeSqlFile(readExecutionContent());
             }
         }
     }
@@ -195,4 +207,21 @@ public class SqlClient {
             System.out.println("done.");
         }
     }
+
+    private String readExecutionContent() {
+        if (options.getSqlFile() != null) {
+            return readFromURL(options.getSqlFile());
+        } else {
+            return options.getUpdateStatement().trim();
+        }
+    }
+
+    private String readFromURL(URL file) {
+        try {
+            return IOUtils.toString(file, StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new SqlExecutionException(
+                    String.format("Fail to read content from the %s.", 
file.getPath()), e);
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index d8a02c2..9b007d1 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.client.SqlClient;
 import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.ResultMode;
 import org.apache.flink.table.client.config.SqlClientOptions;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -70,6 +71,8 @@ import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_RESET_KEY;
 import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SET_KEY;
 import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_STATEMENT_SUBMITTED;
 import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_WAIT_EXECUTE;
+import static org.apache.flink.table.client.config.ResultMode.TABLEAU;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
 import static 
org.apache.flink.table.client.config.YamlConfigUtils.getOptionNameWithDeprecatedKey;
 import static 
org.apache.flink.table.client.config.YamlConfigUtils.getPropertiesInPretty;
 import static 
org.apache.flink.table.client.config.YamlConfigUtils.isDeprecatedKey;
@@ -220,7 +223,7 @@ public class CliClient implements AutoCloseable {
             terminal.writer().append("\n");
             terminal.flush();
 
-            final String line;
+            String line;
             try {
                 line = lineReader.readLine(prompt, null, inputTransformer, 
null);
             } catch (UserInterruptException e) {
@@ -235,8 +238,8 @@ public class CliClient implements AutoCloseable {
             if (line == null) {
                 continue;
             }
-            final Optional<Operation> operation = parseCommand(line);
-            operation.ifPresent(this::callOperation);
+
+            executeStatement(line, ExecutionMode.INTERACTIVE);
         }
     }
 
@@ -250,33 +253,62 @@ public class CliClient implements AutoCloseable {
     }
 
     /**
-     * Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+     * Submits content from Sql file and prints status information and/or 
errors on the terminal.
      *
-     * @param statement SQL update statement
+     * @param content SQL file content
      * @return flag to indicate if the submission was successful or not
      */
-    public boolean submitUpdate(String statement) {
+    public boolean executeSqlFile(String content) {
         
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
-        terminal.writer().println(new AttributedString(statement).toString());
-        terminal.flush();
 
-        final Optional<Operation> operation = parseCommand(statement);
-        // only support INSERT INTO/OVERWRITE
-        return operation
-                .map(
-                        op -> {
-                            if (op instanceof CatalogSinkModifyOperation) {
-                                return callInsert((CatalogSinkModifyOperation) 
op);
-                            } else {
-                                printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
-                                return false;
-                            }
-                        })
-                .orElse(false);
+        for (String statement : CliStatementSplitter.splitContent(content)) {
+            terminal.writer().println(new 
AttributedString(statement).toString());
+            terminal.flush();
+
+            if (!executeStatement(statement, ExecutionMode.NON_INTERACTIVE)) {
+                // cancel execution when meet error or ctrl + C;
+                return false;
+            }
+        }
+        return true;
     }
 
     // 
--------------------------------------------------------------------------------------------
 
+    enum ExecutionMode {
+        INTERACTIVE,
+
+        NON_INTERACTIVE;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private boolean executeStatement(String statement, ExecutionMode 
executionMode) {
+        try {
+            final Optional<Operation> operation = parseCommand(statement);
+            operation.ifPresent(op -> callOperation(op, executionMode));
+        } catch (SqlExecutionException e) {
+            printExecutionException(e);
+            return false;
+        }
+        return true;
+    }
+
+    private void validate(Operation operation, ExecutionMode executionMode) {
+        ResultMode mode = 
executor.getSessionConfig(sessionId).get(EXECUTION_RESULT_MODE);
+        if (operation instanceof QueryOperation
+                && executionMode.equals(ExecutionMode.NON_INTERACTIVE)
+                && !mode.equals(TABLEAU)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "In non-interactive mode, it only supports to use 
%s as value of %s when execute query. Please add 'SET %s=%s;' in the sql file.",
+                            TABLEAU,
+                            EXECUTION_RESULT_MODE.key(),
+                            EXECUTION_RESULT_MODE.key(),
+                            TABLEAU));
+        }
+    }
+
     private Optional<Operation> parseCommand(String stmt) {
         // normalize
         stmt = stmt.trim();
@@ -284,16 +316,19 @@ public class CliClient implements AutoCloseable {
         if (stmt.endsWith(";")) {
             stmt = stmt.substring(0, stmt.length() - 1).trim();
         }
-        try {
-            Operation operation = executor.parseStatement(sessionId, stmt);
-            return Optional.of(operation);
-        } catch (SqlExecutionException e) {
-            printExecutionException(e);
+
+        // meet bad case, e.g ";\n"
+        if (stmt.trim().isEmpty()) {
+            return Optional.empty();
         }
-        return Optional.empty();
+
+        Operation operation = executor.parseStatement(sessionId, stmt);
+        return Optional.of(operation);
     }
 
-    private void callOperation(Operation operation) {
+    private void callOperation(Operation operation, ExecutionMode mode) {
+        validate(operation, mode);
+
         if (operation instanceof QuitOperation) {
             // QUIT/EXIT
             callQuit();
@@ -331,46 +366,38 @@ public class CliClient implements AutoCloseable {
     }
 
     private void callReset(ResetOperation resetOperation) {
-        try {
-            // reset all session properties
-            if (!resetOperation.getKey().isPresent()) {
-                executor.resetSessionProperties(sessionId);
-                printInfo(CliStrings.MESSAGE_RESET);
-            }
-            // reset a session property
-            else {
-                String key = resetOperation.getKey().get();
-                executor.resetSessionProperty(sessionId, key);
-                printSetResetConfigKeyMessage(key, MESSAGE_RESET_KEY);
-            }
-        } catch (SqlExecutionException e) {
-            printExecutionException(e);
+        // reset all session properties
+        if (!resetOperation.getKey().isPresent()) {
+            executor.resetSessionProperties(sessionId);
+            printInfo(CliStrings.MESSAGE_RESET);
+        }
+        // reset a session property
+        else {
+            String key = resetOperation.getKey().get();
+            executor.resetSessionProperty(sessionId, key);
+            printSetResetConfigKeyMessage(key, MESSAGE_RESET_KEY);
         }
     }
 
     private void callSet(SetOperation setOperation) {
-        try {
-            // set a property
-            if (setOperation.getKey().isPresent() && 
setOperation.getValue().isPresent()) {
-                String key = setOperation.getKey().get().trim();
-                String value = setOperation.getValue().get().trim();
-                executor.setSessionProperty(sessionId, key, value);
-                printSetResetConfigKeyMessage(key, MESSAGE_SET_KEY);
-            }
-            // show all properties
-            else {
-                final Map<String, String> properties = 
executor.getSessionConfigMap(sessionId);
-                if (properties.isEmpty()) {
-                    terminal.writer()
-                            
.println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
-                } else {
-                    List<String> prettyEntries = 
getPropertiesInPretty(properties);
-                    prettyEntries.forEach(entry -> 
terminal.writer().println(entry));
-                }
-                terminal.flush();
+        // set a property
+        if (setOperation.getKey().isPresent() && 
setOperation.getValue().isPresent()) {
+            String key = setOperation.getKey().get().trim();
+            String value = setOperation.getValue().get().trim();
+            executor.setSessionProperty(sessionId, key, value);
+            printSetResetConfigKeyMessage(key, MESSAGE_SET_KEY);
+        }
+        // show all properties
+        else {
+            final Map<String, String> properties = 
executor.getSessionConfigMap(sessionId);
+            if (properties.isEmpty()) {
+                terminal.writer()
+                        
.println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
+            } else {
+                List<String> prettyEntries = getPropertiesInPretty(properties);
+                prettyEntries.forEach(entry -> 
terminal.writer().println(entry));
             }
-        } catch (SqlExecutionException e) {
-            printExecutionException(e);
+            terminal.flush();
         }
     }
 
@@ -380,20 +407,12 @@ public class CliClient implements AutoCloseable {
     }
 
     private void callSelect(QueryOperation operation) {
-        final ResultDescriptor resultDesc;
-        try {
-            resultDesc = executor.executeQuery(sessionId, operation);
-        } catch (SqlExecutionException e) {
-            printExecutionException(e);
-            return;
-        }
+        final ResultDescriptor resultDesc = executor.executeQuery(sessionId, 
operation);
 
         if (resultDesc.isTableauMode()) {
             try (CliTableauResultView tableauResultView =
                     new CliTableauResultView(terminal, executor, sessionId, 
resultDesc)) {
                 tableauResultView.displayResults();
-            } catch (SqlExecutionException e) {
-                printExecutionException(e);
             }
         } else {
             final CliResultView<?> view;
@@ -404,69 +423,51 @@ public class CliClient implements AutoCloseable {
             }
 
             // enter view
-            try {
-                view.open();
+            view.open();
 
-                // view left
-                printInfo(CliStrings.MESSAGE_RESULT_QUIT);
-            } catch (SqlExecutionException e) {
-                printExecutionException(e);
-            }
+            // view left
+            printInfo(CliStrings.MESSAGE_RESULT_QUIT);
         }
     }
 
-    private boolean callInsert(CatalogSinkModifyOperation operation) {
+    private void callInsert(CatalogSinkModifyOperation operation) {
         printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
 
-        TableResult tableResult = null;
         boolean sync = 
executor.getSessionConfig(sessionId).get(TABLE_DML_SYNC);
         if (sync) {
             printInfo(MESSAGE_WAIT_EXECUTE);
         }
-        try {
-            tableResult = executor.executeOperation(sessionId, operation);
-            checkState(tableResult.getJobClient().isPresent());
-
-            if (sync) {
-                terminal.writer()
-                        
.println(CliStrings.messageInfo(MESSAGE_FINISH_STATEMENT).toAnsi());
-            } else {
-                terminal.writer()
-                        
.println(CliStrings.messageInfo(MESSAGE_STATEMENT_SUBMITTED).toAnsi());
-                terminal.writer()
-                        .println(
-                                String.format(
-                                        "Job ID: %s\n",
-                                        
tableResult.getJobClient().get().getJobID().toString()));
-            }
-            terminal.flush();
-        } catch (Exception e) {
-            printExecutionException(e);
-            return false;
+        TableResult tableResult = executor.executeOperation(sessionId, 
operation);
+        checkState(tableResult.getJobClient().isPresent());
+        if (sync) {
+            
terminal.writer().println(CliStrings.messageInfo(MESSAGE_FINISH_STATEMENT).toAnsi());
+        } else {
+            
terminal.writer().println(CliStrings.messageInfo(MESSAGE_STATEMENT_SUBMITTED).toAnsi());
+            terminal.writer()
+                    .println(
+                            String.format(
+                                    "Job ID: %s\n",
+                                    
tableResult.getJobClient().get().getJobID().toString()));
         }
-        return true;
+        terminal.flush();
     }
 
     private void executeOperation(Operation operation) {
-        try {
-            TableResult result = executor.executeOperation(sessionId, 
operation);
-            if (TABLE_RESULT_OK == result) {
-                // print more meaningful message than tableau OK result
-                printInfo(MESSAGE_EXECUTE_STATEMENT);
-            } else {
-                // print tableau if result has content
-                PrintUtils.printAsTableauForm(
-                        result.getResolvedSchema(),
-                        result.collect(),
-                        terminal.writer(),
-                        Integer.MAX_VALUE,
-                        "",
-                        false,
-                        false);
-                terminal.flush();
-            }
-        } catch (SqlExecutionException e) {
-            printExecutionException(e);
+        TableResult result = executor.executeOperation(sessionId, operation);
+        if (TABLE_RESULT_OK == result) {
+            // print more meaningful message than tableau OK result
+            printInfo(MESSAGE_EXECUTE_STATEMENT);
+        } else {
+            // print tableau if result has content
+            PrintUtils.printAsTableauForm(
+                    result.getResolvedSchema(),
+                    result.collect(),
+                    terminal.writer(),
+                    Integer.MAX_VALUE,
+                    "",
+                    false,
+                    false);
+            terminal.flush();
         }
     }
 
@@ -480,11 +481,6 @@ public class CliClient implements AutoCloseable {
         terminal.flush();
     }
 
-    private void printError(String message) {
-        terminal.writer().println(CliStrings.messageError(message).toAnsi());
-        terminal.flush();
-    }
-
     private void printInfo(String message) {
         terminal.writer().println(CliStrings.messageInfo(message).toAnsi());
         terminal.flush();
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
index 3379925..71e9f4f 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.client.cli;
 
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nullable;
+
 import java.net.URL;
 import java.util.List;
 
@@ -33,6 +35,7 @@ public class CliOptions {
     private final String sessionId;
     private final URL environment;
     private final URL defaults;
+    private final URL sqlFile;
     private final List<URL> jars;
     private final List<URL> libraryDirs;
     private final String updateStatement;
@@ -44,6 +47,7 @@ public class CliOptions {
             String sessionId,
             URL environment,
             URL defaults,
+            URL sqlFile,
             List<URL> jars,
             List<URL> libraryDirs,
             String updateStatement,
@@ -52,6 +56,7 @@ public class CliOptions {
         this.isPrintHelp = isPrintHelp;
         this.sessionId = sessionId;
         this.environment = environment;
+        this.sqlFile = sqlFile;
         this.defaults = defaults;
         this.jars = jars;
         this.libraryDirs = libraryDirs;
@@ -76,6 +81,10 @@ public class CliOptions {
         return defaults;
     }
 
+    public @Nullable URL getSqlFile() {
+        return sqlFile;
+    }
+
     public List<URL> getJars() {
         return jars;
     }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index 880cbd5..4cbcd2e 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -72,6 +72,17 @@ public class CliOptionsParser {
                                     + "It might overwrite default environment 
properties.")
                     .build();
 
+    public static final Option OPTION_FILE =
+            Option.builder("f")
+                    .required(false)
+                    .longOpt("file")
+                    .numberOfArgs(1)
+                    .argName("script file")
+                    .desc(
+                            "Script file that should be executed. In this 
mode, "
+                                    + "the client will not open an interactive 
terminal.")
+                    .build();
+
     public static final Option OPTION_DEFAULTS =
             Option.builder("d")
                     .required(false)
@@ -107,6 +118,7 @@ public class CliOptionsParser {
                                     + "functions, table sources, or sinks. Can 
be used multiple times.")
                     .build();
 
+    @Deprecated
     public static final Option OPTION_UPDATE =
             Option.builder("u")
                     .required(false)
@@ -114,11 +126,14 @@ public class CliOptionsParser {
                     .numberOfArgs(1)
                     .argName("SQL update statement")
                     .desc(
-                            "Experimental (for testing only!): Instructs the 
SQL Client to immediately execute "
-                                    + "the given update statement after 
starting up. The process is shut down after the "
-                                    + "statement has been submitted to the 
cluster and returns an appropriate return code. "
-                                    + "Currently, this feature is only 
supported for INSERT INTO statements that declare "
-                                    + "the target sink table.")
+                            String.format(
+                                    "Deprecated Experimental (for testing 
only!) feature: Instructs the SQL Client to immediately execute "
+                                            + "the given update statement 
after starting up. The process is shut down after the "
+                                            + "statement has been submitted to 
the cluster and returns an appropriate return code. "
+                                            + "Currently, this feature is only 
supported for INSERT INTO statements that declare "
+                                            + "the target sink table."
+                                            + "Please use option -%s to submit 
update statement.",
+                                    OPTION_FILE.getOpt()))
                     .build();
 
     public static final Option OPTION_HISTORY =
@@ -147,6 +162,7 @@ public class CliOptionsParser {
         buildGeneralOptions(options);
         options.addOption(OPTION_SESSION);
         options.addOption(OPTION_ENVIRONMENT);
+        options.addOption(OPTION_FILE);
         options.addOption(OPTION_DEFAULTS);
         options.addOption(OPTION_JAR);
         options.addOption(OPTION_LIBRARY);
@@ -263,6 +279,7 @@ public class CliOptionsParser {
                     checkSessionId(line),
                     checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
                     checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
+                    checkUrl(line, CliOptionsParser.OPTION_FILE),
                     checkUrls(line, CliOptionsParser.OPTION_JAR),
                     checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
                     
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
@@ -282,6 +299,7 @@ public class CliOptionsParser {
                     checkSessionId(line),
                     checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
                     null,
+                    null,
                     checkUrls(line, CliOptionsParser.OPTION_JAR),
                     checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
                     
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
@@ -301,6 +319,7 @@ public class CliOptionsParser {
                     null,
                     null,
                     checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
+                    null,
                     checkUrls(line, CliOptionsParser.OPTION_JAR),
                     checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
                     null,
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStatementSplitter.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStatementSplitter.java
new file mode 100644
index 0000000..5aea2a8
--- /dev/null
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStatementSplitter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Line splitter to determine whether the submitted line is complete. It also 
offers to split the
+ * submitted content into multiple statements.
+ *
+ * <p>This is a simple splitter. It just split the line in context-unrelated 
way, e.g. it fails to
+ * parse line "';\n'"
+ */
+public class CliStatementSplitter {
+
+    private static final String MASK = "--.*$";
+
+    public static boolean isStatementComplete(String statement) {
+        String[] lines = statement.split("\n");
+        // fix input statement is "\n"
+        if (lines.length == 0) {
+            return false;
+        } else {
+            return isEndOfStatement(lines[lines.length - 1]);
+        }
+    }
+
+    public static List<String> splitContent(String content) {
+        List<String> statements = new ArrayList<>();
+        List<String> buffer = new ArrayList<>();
+
+        for (String line : content.split("\n")) {
+            if (isEndOfStatement(line)) {
+                buffer.add(line);
+                statements.add(String.join("\n", buffer));
+                buffer.clear();
+            } else {
+                buffer.add(line);
+            }
+        }
+        if (!buffer.isEmpty()) {
+            statements.add(String.join("\n", buffer));
+        }
+        return statements;
+    }
+
+    private static boolean isEndOfStatement(String line) {
+        return line.replaceAll(MASK, "").trim().endsWith(";");
+    }
+}
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index d066650..5c7dc04 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -199,9 +199,7 @@ public final class CliStrings {
     public static final String MESSAGE_STATEMENT_SUBMITTED =
             "Table update statement has been successfully submitted to the 
cluster:";
 
-    public static final String MESSAGE_WILL_EXECUTE = "Executing the following 
statement:";
-
-    public static final String MESSAGE_UNSUPPORTED_SQL = "Unsupported SQL 
statement.";
+    public static final String MESSAGE_WILL_EXECUTE = "Executing the SQL from 
the file:";
 
     public static final String MESSAGE_WAIT_EXECUTE =
             "Execute statement in sync mode. Please wait for the execution 
finish...";
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
index ceb11ec..f85eb71 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
@@ -31,7 +31,6 @@ import java.util.List;
  */
 public class SqlMultiLineParser extends DefaultParser {
 
-    private static final String EOF_CHARACTER = ";";
     private static final String NEW_LINE_PROMPT = ""; // results in simple '>' 
output
 
     public SqlMultiLineParser() {
@@ -41,7 +40,7 @@ public class SqlMultiLineParser extends DefaultParser {
 
     @Override
     public ParsedLine parse(String line, int cursor, ParseContext context) {
-        if (!line.trim().endsWith(EOF_CHARACTER) && context != 
ParseContext.COMPLETE) {
+        if (!CliStatementSplitter.isStatementComplete(line) && context != 
ParseContext.COMPLETE) {
             throw new EOFError(-1, -1, "New line without EOF character.", 
NEW_LINE_PROMPT);
         }
         final ArgumentList parsedLine = (ArgumentList) super.parse(line, 
cursor, context);
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
index 7e1d754..8062a65 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
@@ -37,7 +37,12 @@ import java.io.InputStream;
 import java.io.PrintStream;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -66,6 +71,8 @@ public class SqlClientTest {
 
     private String historyPath;
 
+    private String sqlFilePath;
+
     @Before
     public void before() throws IOException {
         originalEnv = System.getenv();
@@ -197,4 +204,32 @@ public class SqlClientTest {
             assertThat(output, containsString(error));
         }
     }
+
+    @Test
+    public void testExecuteSqlFile() throws IOException {
+        // create sql file
+        File sqlFileFolder = tempFolder.newFolder("sql-file");
+        File sqlFile = new File(sqlFileFolder, "test-sql.sql");
+        if (!sqlFile.createNewFile()) {
+            throw new IOException("Can't create testing test-sql.sql file.");
+        }
+        sqlFilePath = sqlFile.getPath();
+
+        List<String> statements = Collections.singletonList("HELP;");
+        Files.write(
+                Paths.get(sqlFilePath),
+                statements,
+                StandardCharsets.UTF_8,
+                StandardOpenOption.APPEND);
+
+        String[] args = new String[] {"-f", sqlFilePath};
+        SqlClient.main(args);
+        final URL url = 
getClass().getClassLoader().getResource("sql-client-help-command.out");
+        final String help = FileUtils.readFileUtf8(new File(url.getFile()));
+        final String output = getStdoutString();
+
+        for (String command : help.split("\n")) {
+            assertThat(output, containsString(command));
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index a203202..427b3ee 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -38,6 +38,7 @@ import org.jline.terminal.Terminal;
 import org.jline.terminal.impl.DumbTerminal;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -79,6 +80,8 @@ public class CliClientITCase extends AbstractTestBase {
 
     @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
 
+    @Rule public TerminalStreamsResource useSystemStream = 
TerminalStreamsResource.INSTANCE;
+
     @Parameterized.Parameter public String sqlPath;
 
     @Parameterized.Parameters(name = "{0}")
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 2645e47..8bedeb9 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -51,7 +51,9 @@ import org.jline.reader.ParsedLine;
 import org.jline.reader.Parser;
 import org.jline.terminal.Terminal;
 import org.jline.terminal.impl.DumbTerminal;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
 
@@ -60,6 +62,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -68,9 +72,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.FutureTask;
 
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -82,8 +86,10 @@ public class CliClientTest extends TestLogger {
             "INSERT INTO MyTable SELECT * FROM MyOtherTable";
     private static final String INSERT_OVERWRITE_STATEMENT =
             "INSERT OVERWRITE MyTable SELECT * FROM MyOtherTable";
-    private static final String SELECT_STATEMENT = "SELECT * FROM 
MyOtherTable";
-    private static final Row SHOW_ROW = new Row(1);
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Rule public TerminalStreamsResource useSystemStream = 
TerminalStreamsResource.INSTANCE;
 
     @Test
     public void testUpdateSubmission() throws Exception {
@@ -96,9 +102,18 @@ public class CliClientTest extends TestLogger {
         // fail at executor
         verifyUpdateSubmission(INSERT_INTO_STATEMENT, true, true);
         verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, true, true);
+    }
 
-        // fail early in client
-        verifyUpdateSubmission(SELECT_STATEMENT, false, true);
+    @Test
+    public void testExecuteSqlFile() throws Exception {
+        MockExecutor executor = new MockExecutor();
+        executeSqlFromContent(
+                executor,
+                String.join(
+                        ";\n",
+                        Arrays.asList(
+                                INSERT_INTO_STATEMENT, "", 
INSERT_OVERWRITE_STATEMENT, "\n")));
+        assertEquals(INSERT_OVERWRITE_STATEMENT, executor.receivedStatement);
     }
 
     @Test
@@ -134,22 +149,116 @@ public class CliClientTest extends TestLogger {
     }
 
     @Test
-    public void verifyCancelSubmitInSyncMode() throws Exception {
-        final MockExecutor mockExecutor = new MockExecutor(true);
+    public void testGetEOFinNonInteractiveMode() throws Exception {
+        final List<String> statements =
+                Arrays.asList("DESC MyOtherTable;", "SHOW TABLES"); // meet EOF
+        String content = String.join("\n", statements);
+
+        final MockExecutor mockExecutor = new MockExecutor();
+
+        executeSqlFromContent(mockExecutor, content);
+        // execute the last commands
+        assertTrue(statements.get(1).contains(mockExecutor.receivedStatement));
+    }
+
+    @Test
+    public void testUnknownStatementInNonInteractiveMode() throws Exception {
+        final List<String> statements =
+                Arrays.asList(
+                        "ERT INTO MyOtherTable VALUES (1, 101), (2, 102);",
+                        "DESC MyOtherTable;",
+                        "SHOW TABLES;");
+        String content = String.join("\n", statements);
+
+        final MockExecutor mockExecutor = new MockExecutor();
+
+        executeSqlFromContent(mockExecutor, content);
+        // don't execute other commands
+        assertTrue(statements.get(0).contains(mockExecutor.receivedStatement));
+    }
+
+    @Test
+    public void testFailedExecutionInNonInteractiveMode() throws Exception {
+        final List<String> statements =
+                Arrays.asList(
+                        "INSERT INTO MyOtherTable VALUES (1, 101), (2, 102);",
+                        "DESC MyOtherTable;",
+                        "SHOW TABLES;");
+        String content = String.join("\n", statements);
+
+        final MockExecutor mockExecutor = new MockExecutor();
+        mockExecutor.failExecution = true;
+
+        executeSqlFromContent(mockExecutor, content);
+        // don't execute other commands
+        assertTrue(statements.get(0).contains(mockExecutor.receivedStatement));
+    }
+
+    @Test
+    public void testIllegalResultModeInNonInteractiveMode() throws Exception {
+        // When client executes sql file, it requires 
sql-client.execution.result-mode = tableau;
+        // Therefore, it will get execution error and stop executing the sql 
follows the illegal
+        // statement.
+        final List<String> statements =
+                Arrays.asList(
+                        "SELECT * FROM MyOtherTable;",
+                        "HELP;",
+                        "INSERT INTO MyOtherTable VALUES (1, 101), (2, 102);",
+                        "DESC MyOtherTable;",
+                        "SHOW TABLES;");
+
+        String content = String.join("\n", statements);
+
+        final MockExecutor mockExecutor = new MockExecutor();
+
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage(
+                "In non-interactive mode, it only supports to use TABLEAU as 
value of "
+                        + "sql-client.execution.result-mode when execute 
query. Please add "
+                        + "'SET sql-client.execution.result-mode=TABLEAU;' in 
the sql file.");
+        executeSqlFromContent(mockExecutor, content);
+    }
+
+    @Test
+    public void testCancelExecutionInNonInteractiveMode() throws Exception {
+        // add "\n" with quit to trigger commit the line
+        final List<String> statements =
+                Arrays.asList(
+                        "HELP;",
+                        "CREATE TABLE tbl( -- comment\n"
+                                + "-- comment with ;\n"
+                                + "id INT,\n"
+                                + "name STRING\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values'\n"
+                                + ");\n",
+                        "INSERT INTO \n"
+                                + "--COMMENT ; \n"
+                                + "MyOtherTable VALUES (1, 101), (2, 102);",
+                        "DESC MyOtherTable;",
+                        "SHOW TABLES;",
+                        "QUIT;\n");
+
+        // use table.dml-sync to keep running
+        // therefore in non-interactive mode, the last executed command is 
INSERT INTO
+        final int hookIndex = 2;
+
+        String content = String.join("\n", statements);
+
+        final MockExecutor mockExecutor = new MockExecutor();
+        mockExecutor.isSync = true;
+
         String sessionId = mockExecutor.openSession("test-session");
 
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(256);
-
-        try (CliClient client =
-                new CliClient(
-                        TerminalUtils.createDummyTerminal(outputStream),
-                        sessionId,
-                        mockExecutor,
-                        historyTempFile(),
-                        null)) {
-            FutureTask<Boolean> task =
-                    new FutureTask<>(() -> 
client.submitUpdate(INSERT_INTO_STATEMENT));
-            Thread thread = new Thread(task);
+        Path historyFilePath = historyTempFile();
+
+        OutputStream outputStream = new ByteArrayOutputStream(256);
+        System.setOut(new PrintStream(outputStream));
+
+        try (Terminal terminal = 
TerminalUtils.createDummyTerminal(outputStream);
+                CliClient client =
+                        new CliClient(terminal, sessionId, mockExecutor, 
historyFilePath, null)) {
+            Thread thread = new Thread(() -> client.executeSqlFile(content));
             thread.start();
 
             while (!mockExecutor.isAwait) {
@@ -157,12 +266,18 @@ public class CliClientTest extends TestLogger {
             }
 
             thread.interrupt();
-            assertFalse(task.get());
+
+            while (thread.isAlive()) {
+                Thread.sleep(10);
+            }
             assertTrue(
                     outputStream
                             .toString()
                             .contains("java.lang.InterruptedException: sleep 
interrupted"));
         }
+
+        // read the last executed statement
+        
assertTrue(statements.get(hookIndex).contains(mockExecutor.receivedStatement));
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -170,22 +285,15 @@ public class CliClientTest extends TestLogger {
     private void verifyUpdateSubmission(
             String statement, boolean failExecution, boolean testFailure) 
throws Exception {
         final MockExecutor mockExecutor = new MockExecutor();
-        String sessionId = mockExecutor.openSession("test-session");
         mockExecutor.failExecution = failExecution;
 
-        try (CliClient client =
-                new CliClient(
-                        TerminalUtils.createDummyTerminal(),
-                        sessionId,
-                        mockExecutor,
-                        historyTempFile(),
-                        null)) {
-            if (testFailure) {
-                assertFalse(client.submitUpdate(statement));
-            } else {
-                assertTrue(client.submitUpdate(statement));
-                assertEquals(statement, mockExecutor.receivedStatement);
-            }
+        String result = executeSqlFromContent(mockExecutor, statement);
+
+        if (testFailure) {
+            assertTrue(result.contains(MESSAGE_SQL_EXECUTION_ERROR));
+        } else {
+            assertFalse(result.contains(MESSAGE_SQL_EXECUTION_ERROR));
+            assertEquals(statement, mockExecutor.receivedStatement);
         }
     }
 
@@ -218,6 +326,18 @@ public class CliClientTest extends TestLogger {
         return File.createTempFile("history", "tmp").toPath();
     }
 
+    private String executeSqlFromContent(MockExecutor executor, String 
content) throws IOException {
+        String sessionId = executor.openSession("test-session");
+        OutputStream outputStream = new ByteArrayOutputStream(256);
+        System.setOut(new PrintStream(outputStream));
+        try (Terminal terminal = 
TerminalUtils.createDummyTerminal(outputStream);
+                CliClient client =
+                        new CliClient(terminal, sessionId, executor, 
historyTempFile(), null)) {
+            client.executeSqlFile(content);
+        }
+        return outputStream.toString();
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static class MockExecutor implements Executor {
@@ -228,29 +348,22 @@ public class CliClientTest extends TestLogger {
         public boolean isAwait = false;
         public String receivedStatement;
         public int receivedPosition;
-        private Configuration configuration = new Configuration();
         private final Map<String, SessionContext> sessionMap = new HashMap<>();
         private final SqlParserHelper helper = new SqlParserHelper();
 
         @Override
         public void start() throws SqlExecutionException {}
 
-        public MockExecutor() {
-            this(false);
-        }
-
-        public MockExecutor(boolean isSync) {
-            configuration.set(TABLE_DML_SYNC, isSync);
-            this.isSync = isSync;
-        }
-
         @Override
         public String openSession(@Nullable String sessionId) throws 
SqlExecutionException {
+            Configuration configuration = new Configuration();
+            configuration.set(TABLE_DML_SYNC, isSync);
+
             DefaultContext defaultContext =
                     new DefaultContext(
                             new Environment(),
                             Collections.emptyList(),
-                            new Configuration(),
+                            configuration,
                             Collections.singletonList(new DefaultCLI()));
             SessionContext context = SessionContext.create(defaultContext, 
sessionId);
             sessionMap.put(sessionId, context);
@@ -293,7 +406,7 @@ public class CliClientTest extends TestLogger {
             if (failExecution) {
                 throw new SqlExecutionException("Fail execution.");
             }
-            if (operation instanceof ModifyOperation || operation instanceof 
QueryOperation) {
+            if (operation instanceof ModifyOperation) {
                 if (isSync) {
                     isAwait = true;
                     try {
@@ -316,8 +429,12 @@ public class CliClientTest extends TestLogger {
         public Operation parseStatement(String sessionId, String statement)
                 throws SqlExecutionException {
             receivedStatement = statement;
-            List<Operation> ops = helper.getSqlParser().parse(statement);
-            return ops.get(0);
+
+            try {
+                return helper.getSqlParser().parse(statement).get(0);
+            } catch (Exception ex) {
+                throw new SqlExecutionException("Parse error: " + statement, 
ex);
+            }
         }
 
         @Override
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliStatementSplitterTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliStatementSplitterTest.java
new file mode 100644
index 0000000..92c07bf
--- /dev/null
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliStatementSplitterTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Test {@link CliStatementSplitter}. */
+public class CliStatementSplitterTest {
+
+    @Test
+    public void testIsNotEndOfStatement() {
+        // TODO: fix ';
+        List<String> lines = Arrays.asList(" --;", "", "select -- ok;", "\n");
+        for (String line : lines) {
+            assertFalse(
+                    String.format("%s is not end of statement but get true", 
line),
+                    CliStatementSplitter.isStatementComplete(line));
+        }
+    }
+
+    @Test
+    public void testIsEndOfStatement() {
+        List<String> lines = Arrays.asList(" ; --;", "select a from b;-- ok;", 
";\n");
+        for (String line : lines) {
+            assertTrue(
+                    String.format("%s is end of statement but get false", 
line),
+                    CliStatementSplitter.isStatementComplete(line));
+        }
+    }
+
+    @Test
+    public void testSplitContent() {
+        List<String> lines =
+                Arrays.asList(
+                        "CREATE TABLE MyTable (\n"
+                                + "  id INT,\n"
+                                + "  name STRING,\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'test-property' = 'test.value'\n);",
+                        "SET a = b;",
+                        "\n" + "SELECT func(id) from MyTable\n;");
+        List<String> actual = 
CliStatementSplitter.splitContent(String.join("\n", lines));
+
+        for (int i = 0; i < lines.size(); i++) {
+            assertEquals(lines.get(i), actual.get(i));
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java
index 85a5849..c2d2fcd 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.client.cli;
 
 import org.junit.rules.ExternalResource;
 
+import java.io.PrintStream;
+
 /**
  * Enables {@link org.apache.flink.table.client.SqlClient} to create a default 
terminal using {@link
  * System#in} and {@link System#out} as the input and output stream. This can 
allows tests to easily
@@ -28,6 +30,7 @@ import org.junit.rules.ExternalResource;
 public class TerminalStreamsResource extends ExternalResource {
 
     public static final TerminalStreamsResource INSTANCE = new 
TerminalStreamsResource();
+    public static final PrintStream STD_OUT = System.out;
 
     private TerminalStreamsResource() {
         // singleton
@@ -41,5 +44,7 @@ public class TerminalStreamsResource extends ExternalResource 
{
     @Override
     protected void after() {
         CliClient.useSystemInOutStream = false;
+        // Reset the System.out
+        System.setOut(STD_OUT);
     }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql-client-help-command.out 
b/flink-table/flink-sql-client/src/test/resources/sql-client-help-command.out
new file mode 100644
index 0000000..8d8bf88
--- /dev/null
+++ 
b/flink-table/flink-sql-client/src/test/resources/sql-client-help-command.out
@@ -0,0 +1,24 @@
+CLEAR          Clears the current terminal.
+CREATE TABLE           Create table under current catalog and database.
+DROP TABLE             Drop table with optional catalog and database. Syntax: 
'DROP TABLE [IF EXISTS] <name>;'
+CREATE VIEW            Creates a virtual table from a SQL query. Syntax: 
'CREATE VIEW <name> AS <query>;'
+DESCRIBE               Describes the schema of a table with the given name.
+DROP VIEW              Deletes a previously created virtual table. Syntax: 
'DROP VIEW <name>;'
+EXPLAIN                Describes the execution plan of a query or table with 
the given name.
+HELP           Prints the available commands.
+INSERT INTO            Inserts the results of a SQL SELECT query into a 
declared table sink.
+INSERT OVERWRITE               Inserts the results of a SQL SELECT query into 
a declared table sink and overwrite existing data.
+QUIT           Quits the SQL CLI client.
+RESET          Resets a session configuration property. Syntax: 'RESET 
<key>;'. Use 'RESET;' for reset all session properties.
+SELECT         Executes a SQL SELECT query on the Flink cluster.
+SET            Sets a session configuration property. Syntax: 'SET 
<key>=<value>;'. Use 'SET;' for listing all properties.
+SHOW FUNCTIONS         Shows all user-defined and built-in functions or only 
user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'
+SHOW TABLES            Shows all registered tables.
+SOURCE         Reads a SQL SELECT query from a file and executes it on the 
Flink cluster.
+USE CATALOG            Sets the current catalog. The current database is set 
to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
+USE            Sets the current default database. Experimental! Syntax: 'USE 
<name>;'
+LOAD MODULE            Load a module. Syntax: 'LOAD MODULE <name> [WITH 
('<key1>' = '<value1>' [, '<key2>' = '<value2>', ...])];'
+UNLOAD MODULE          Unload a module. Syntax: 'UNLOAD MODULE <name>;'
+USE MODULES            Enable loaded modules. Syntax: 'USE MODULES <name1> [, 
<name2>, ...];'
+
+Hint: Make sure that a statement ends with ';' for finalizing (multi-line) 
statements.
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql-client-help.out 
b/flink-table/flink-sql-client/src/test/resources/sql-client-help.out
index fe9173f..7b40fc9 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql-client-help.out
+++ b/flink-table/flink-sql-client/src/test/resources/sql-client-help.out
@@ -14,6 +14,9 @@ Mode "embedded" (default) submits Flink jobs from the local 
machine.
                                            imported into the session. It might
                                            overwrite default environment
                                            properties.
+     -f,--file <script file>               Script file that should be executed.
+                                           In this mode, the client will not
+                                           open an interactive terminal.
      -h,--help                             Show the help message with
                                            descriptions of all options.
      -hist,--history <History file path>   The file which you want to save the
@@ -101,16 +104,17 @@ Mode "embedded" (default) submits Flink jobs from the 
local machine.
                                            tmp/cached_dir).
      -s,--session <session identifier>     The identifier for a session.
                                            'default' is the default identifier.
-     -u,--update <SQL update statement>    Experimental (for testing only!):
-                                           Instructs the SQL Client to
-                                           immediately execute the given update
-                                           statement after starting up. The
-                                           process is shut down after the
-                                           statement has been submitted to the
-                                           cluster and returns an appropriate
-                                           return code. Currently, this feature
-                                           is only supported for INSERT INTO
-                                           statements that declare the target
-                                           sink table.
+     -u,--update <SQL update statement>    Deprecated Experimental (for testing
+                                           only!) feature: Instructs the SQL
+                                           Client to immediately execute the
+                                           given update statement after 
starting
+                                           up. The process is shut down after
+                                           the statement has been submitted to
+                                           the cluster and returns an
+                                           appropriate return code. Currently,
+                                           this feature is only supported for
+                                           INSERT INTO statements that declare
+                                           the target sink table.Please use
+                                           option -f to submit update 
statement.
 
 

Reply via email to