This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f594d71 [FLINK-12828][sql-client] Support -f option with a sql script as input f594d71 is described below commit f594d71ac63accb21e9cf854c3525fe2ca913488 Author: Shengkai <1059623...@qq.com> AuthorDate: Sat Mar 27 14:39:48 2021 +0800 [FLINK-12828][sql-client] Support -f option with a sql script as input This closes #15366 --- .../org/apache/flink/table/client/SqlClient.java | 49 +++- .../apache/flink/table/client/cli/CliClient.java | 248 ++++++++++----------- .../apache/flink/table/client/cli/CliOptions.java | 9 + .../flink/table/client/cli/CliOptionsParser.java | 29 ++- .../table/client/cli/CliStatementSplitter.java | 67 ++++++ .../apache/flink/table/client/cli/CliStrings.java | 4 +- .../flink/table/client/cli/SqlMultiLineParser.java | 3 +- .../apache/flink/table/client/SqlClientTest.java | 35 +++ .../flink/table/client/cli/CliClientITCase.java | 3 + .../flink/table/client/cli/CliClientTest.java | 213 ++++++++++++++---- .../table/client/cli/CliStatementSplitterTest.java | 72 ++++++ .../table/client/cli/TerminalStreamsResource.java | 5 + .../src/test/resources/sql-client-help-command.out | 24 ++ .../src/test/resources/sql-client-help.out | 26 ++- 14 files changed, 582 insertions(+), 205 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java index 8e2bcbc..79b2a71 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java @@ -22,14 +22,19 @@ import org.apache.flink.table.client.cli.CliClient; import org.apache.flink.table.client.cli.CliOptions; import org.apache.flink.table.client.cli.CliOptionsParser; import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.context.DefaultContext; import org.apache.flink.table.client.gateway.local.LocalContextUtils; import org.apache.flink.table.client.gateway.local.LocalExecutor; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; @@ -106,18 +111,25 @@ public class SqlClient { SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history"); } + boolean hasSqlFile = options.getSqlFile() != null; + boolean hasUpdateStatement = options.getUpdateStatement() != null; + if (hasSqlFile && hasUpdateStatement) { + throw new IllegalArgumentException( + String.format( + "Please use either option %s or %s. The option %s is deprecated and it's suggested to use %s instead.", + CliOptionsParser.OPTION_FILE, + CliOptionsParser.OPTION_UPDATE, + CliOptionsParser.OPTION_UPDATE.getOpt(), + CliOptionsParser.OPTION_FILE.getOpt())); + } + + boolean isInteractiveMode = !hasSqlFile && !hasUpdateStatement; + try (CliClient cli = new CliClient(sessionId, executor, historyFilePath)) { - // interactive CLI mode - if (options.getUpdateStatement() == null) { + if (isInteractiveMode) { cli.open(); - } - // execute single update statement - else { - final boolean success = cli.submitUpdate(options.getUpdateStatement()); - if (!success) { - throw new SqlClientException( - "Could not submit given SQL update statement to cluster."); - } + } else { + cli.executeSqlFile(readExecutionContent()); } } } @@ -195,4 +207,21 @@ public class SqlClient { System.out.println("done."); } } + + private String readExecutionContent() { + if (options.getSqlFile() != null) { + return readFromURL(options.getSqlFile()); + } else { + return options.getUpdateStatement().trim(); + } + } + + private String readFromURL(URL file) { + try { + return IOUtils.toString(file, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new SqlExecutionException( + String.format("Fail to read content from the %s.", file.getPath()), e); + } + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index d8a02c2..9b007d1 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.client.SqlClient; import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.ResultMode; import org.apache.flink.table.client.config.SqlClientOptions; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.ResultDescriptor; @@ -70,6 +71,8 @@ import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_RESET_KEY; import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SET_KEY; import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_STATEMENT_SUBMITTED; import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_WAIT_EXECUTE; +import static org.apache.flink.table.client.config.ResultMode.TABLEAU; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; import static org.apache.flink.table.client.config.YamlConfigUtils.getOptionNameWithDeprecatedKey; import static org.apache.flink.table.client.config.YamlConfigUtils.getPropertiesInPretty; import static org.apache.flink.table.client.config.YamlConfigUtils.isDeprecatedKey; @@ -220,7 +223,7 @@ public class CliClient implements AutoCloseable { terminal.writer().append("\n"); terminal.flush(); - final String line; + String line; try { line = lineReader.readLine(prompt, null, inputTransformer, null); } catch (UserInterruptException e) { @@ -235,8 +238,8 @@ public class CliClient implements AutoCloseable { if (line == null) { continue; } - final Optional<Operation> operation = parseCommand(line); - operation.ifPresent(this::callOperation); + + executeStatement(line, ExecutionMode.INTERACTIVE); } } @@ -250,33 +253,62 @@ public class CliClient implements AutoCloseable { } /** - * Submits a SQL update statement and prints status information and/or errors on the terminal. + * Submits content from Sql file and prints status information and/or errors on the terminal. * - * @param statement SQL update statement + * @param content SQL file content * @return flag to indicate if the submission was successful or not */ - public boolean submitUpdate(String statement) { + public boolean executeSqlFile(String content) { terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi()); - terminal.writer().println(new AttributedString(statement).toString()); - terminal.flush(); - final Optional<Operation> operation = parseCommand(statement); - // only support INSERT INTO/OVERWRITE - return operation - .map( - op -> { - if (op instanceof CatalogSinkModifyOperation) { - return callInsert((CatalogSinkModifyOperation) op); - } else { - printError(CliStrings.MESSAGE_UNSUPPORTED_SQL); - return false; - } - }) - .orElse(false); + for (String statement : CliStatementSplitter.splitContent(content)) { + terminal.writer().println(new AttributedString(statement).toString()); + terminal.flush(); + + if (!executeStatement(statement, ExecutionMode.NON_INTERACTIVE)) { + // cancel execution when meet error or ctrl + C; + return false; + } + } + return true; } // -------------------------------------------------------------------------------------------- + enum ExecutionMode { + INTERACTIVE, + + NON_INTERACTIVE; + } + + // -------------------------------------------------------------------------------------------- + + private boolean executeStatement(String statement, ExecutionMode executionMode) { + try { + final Optional<Operation> operation = parseCommand(statement); + operation.ifPresent(op -> callOperation(op, executionMode)); + } catch (SqlExecutionException e) { + printExecutionException(e); + return false; + } + return true; + } + + private void validate(Operation operation, ExecutionMode executionMode) { + ResultMode mode = executor.getSessionConfig(sessionId).get(EXECUTION_RESULT_MODE); + if (operation instanceof QueryOperation + && executionMode.equals(ExecutionMode.NON_INTERACTIVE) + && !mode.equals(TABLEAU)) { + throw new IllegalArgumentException( + String.format( + "In non-interactive mode, it only supports to use %s as value of %s when execute query. Please add 'SET %s=%s;' in the sql file.", + TABLEAU, + EXECUTION_RESULT_MODE.key(), + EXECUTION_RESULT_MODE.key(), + TABLEAU)); + } + } + private Optional<Operation> parseCommand(String stmt) { // normalize stmt = stmt.trim(); @@ -284,16 +316,19 @@ public class CliClient implements AutoCloseable { if (stmt.endsWith(";")) { stmt = stmt.substring(0, stmt.length() - 1).trim(); } - try { - Operation operation = executor.parseStatement(sessionId, stmt); - return Optional.of(operation); - } catch (SqlExecutionException e) { - printExecutionException(e); + + // meet bad case, e.g ";\n" + if (stmt.trim().isEmpty()) { + return Optional.empty(); } - return Optional.empty(); + + Operation operation = executor.parseStatement(sessionId, stmt); + return Optional.of(operation); } - private void callOperation(Operation operation) { + private void callOperation(Operation operation, ExecutionMode mode) { + validate(operation, mode); + if (operation instanceof QuitOperation) { // QUIT/EXIT callQuit(); @@ -331,46 +366,38 @@ public class CliClient implements AutoCloseable { } private void callReset(ResetOperation resetOperation) { - try { - // reset all session properties - if (!resetOperation.getKey().isPresent()) { - executor.resetSessionProperties(sessionId); - printInfo(CliStrings.MESSAGE_RESET); - } - // reset a session property - else { - String key = resetOperation.getKey().get(); - executor.resetSessionProperty(sessionId, key); - printSetResetConfigKeyMessage(key, MESSAGE_RESET_KEY); - } - } catch (SqlExecutionException e) { - printExecutionException(e); + // reset all session properties + if (!resetOperation.getKey().isPresent()) { + executor.resetSessionProperties(sessionId); + printInfo(CliStrings.MESSAGE_RESET); + } + // reset a session property + else { + String key = resetOperation.getKey().get(); + executor.resetSessionProperty(sessionId, key); + printSetResetConfigKeyMessage(key, MESSAGE_RESET_KEY); } } private void callSet(SetOperation setOperation) { - try { - // set a property - if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { - String key = setOperation.getKey().get().trim(); - String value = setOperation.getValue().get().trim(); - executor.setSessionProperty(sessionId, key, value); - printSetResetConfigKeyMessage(key, MESSAGE_SET_KEY); - } - // show all properties - else { - final Map<String, String> properties = executor.getSessionConfigMap(sessionId); - if (properties.isEmpty()) { - terminal.writer() - .println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi()); - } else { - List<String> prettyEntries = getPropertiesInPretty(properties); - prettyEntries.forEach(entry -> terminal.writer().println(entry)); - } - terminal.flush(); + // set a property + if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { + String key = setOperation.getKey().get().trim(); + String value = setOperation.getValue().get().trim(); + executor.setSessionProperty(sessionId, key, value); + printSetResetConfigKeyMessage(key, MESSAGE_SET_KEY); + } + // show all properties + else { + final Map<String, String> properties = executor.getSessionConfigMap(sessionId); + if (properties.isEmpty()) { + terminal.writer() + .println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi()); + } else { + List<String> prettyEntries = getPropertiesInPretty(properties); + prettyEntries.forEach(entry -> terminal.writer().println(entry)); } - } catch (SqlExecutionException e) { - printExecutionException(e); + terminal.flush(); } } @@ -380,20 +407,12 @@ public class CliClient implements AutoCloseable { } private void callSelect(QueryOperation operation) { - final ResultDescriptor resultDesc; - try { - resultDesc = executor.executeQuery(sessionId, operation); - } catch (SqlExecutionException e) { - printExecutionException(e); - return; - } + final ResultDescriptor resultDesc = executor.executeQuery(sessionId, operation); if (resultDesc.isTableauMode()) { try (CliTableauResultView tableauResultView = new CliTableauResultView(terminal, executor, sessionId, resultDesc)) { tableauResultView.displayResults(); - } catch (SqlExecutionException e) { - printExecutionException(e); } } else { final CliResultView<?> view; @@ -404,69 +423,51 @@ public class CliClient implements AutoCloseable { } // enter view - try { - view.open(); + view.open(); - // view left - printInfo(CliStrings.MESSAGE_RESULT_QUIT); - } catch (SqlExecutionException e) { - printExecutionException(e); - } + // view left + printInfo(CliStrings.MESSAGE_RESULT_QUIT); } } - private boolean callInsert(CatalogSinkModifyOperation operation) { + private void callInsert(CatalogSinkModifyOperation operation) { printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); - TableResult tableResult = null; boolean sync = executor.getSessionConfig(sessionId).get(TABLE_DML_SYNC); if (sync) { printInfo(MESSAGE_WAIT_EXECUTE); } - try { - tableResult = executor.executeOperation(sessionId, operation); - checkState(tableResult.getJobClient().isPresent()); - - if (sync) { - terminal.writer() - .println(CliStrings.messageInfo(MESSAGE_FINISH_STATEMENT).toAnsi()); - } else { - terminal.writer() - .println(CliStrings.messageInfo(MESSAGE_STATEMENT_SUBMITTED).toAnsi()); - terminal.writer() - .println( - String.format( - "Job ID: %s\n", - tableResult.getJobClient().get().getJobID().toString())); - } - terminal.flush(); - } catch (Exception e) { - printExecutionException(e); - return false; + TableResult tableResult = executor.executeOperation(sessionId, operation); + checkState(tableResult.getJobClient().isPresent()); + if (sync) { + terminal.writer().println(CliStrings.messageInfo(MESSAGE_FINISH_STATEMENT).toAnsi()); + } else { + terminal.writer().println(CliStrings.messageInfo(MESSAGE_STATEMENT_SUBMITTED).toAnsi()); + terminal.writer() + .println( + String.format( + "Job ID: %s\n", + tableResult.getJobClient().get().getJobID().toString())); } - return true; + terminal.flush(); } private void executeOperation(Operation operation) { - try { - TableResult result = executor.executeOperation(sessionId, operation); - if (TABLE_RESULT_OK == result) { - // print more meaningful message than tableau OK result - printInfo(MESSAGE_EXECUTE_STATEMENT); - } else { - // print tableau if result has content - PrintUtils.printAsTableauForm( - result.getResolvedSchema(), - result.collect(), - terminal.writer(), - Integer.MAX_VALUE, - "", - false, - false); - terminal.flush(); - } - } catch (SqlExecutionException e) { - printExecutionException(e); + TableResult result = executor.executeOperation(sessionId, operation); + if (TABLE_RESULT_OK == result) { + // print more meaningful message than tableau OK result + printInfo(MESSAGE_EXECUTE_STATEMENT); + } else { + // print tableau if result has content + PrintUtils.printAsTableauForm( + result.getResolvedSchema(), + result.collect(), + terminal.writer(), + Integer.MAX_VALUE, + "", + false, + false); + terminal.flush(); } } @@ -480,11 +481,6 @@ public class CliClient implements AutoCloseable { terminal.flush(); } - private void printError(String message) { - terminal.writer().println(CliStrings.messageError(message).toAnsi()); - terminal.flush(); - } - private void printInfo(String message) { terminal.writer().println(CliStrings.messageInfo(message).toAnsi()); terminal.flush(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java index 3379925..71e9f4f 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java @@ -20,6 +20,8 @@ package org.apache.flink.table.client.cli; import org.apache.flink.configuration.Configuration; +import javax.annotation.Nullable; + import java.net.URL; import java.util.List; @@ -33,6 +35,7 @@ public class CliOptions { private final String sessionId; private final URL environment; private final URL defaults; + private final URL sqlFile; private final List<URL> jars; private final List<URL> libraryDirs; private final String updateStatement; @@ -44,6 +47,7 @@ public class CliOptions { String sessionId, URL environment, URL defaults, + URL sqlFile, List<URL> jars, List<URL> libraryDirs, String updateStatement, @@ -52,6 +56,7 @@ public class CliOptions { this.isPrintHelp = isPrintHelp; this.sessionId = sessionId; this.environment = environment; + this.sqlFile = sqlFile; this.defaults = defaults; this.jars = jars; this.libraryDirs = libraryDirs; @@ -76,6 +81,10 @@ public class CliOptions { return defaults; } + public @Nullable URL getSqlFile() { + return sqlFile; + } + public List<URL> getJars() { return jars; } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java index 880cbd5..4cbcd2e 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java @@ -72,6 +72,17 @@ public class CliOptionsParser { + "It might overwrite default environment properties.") .build(); + public static final Option OPTION_FILE = + Option.builder("f") + .required(false) + .longOpt("file") + .numberOfArgs(1) + .argName("script file") + .desc( + "Script file that should be executed. In this mode, " + + "the client will not open an interactive terminal.") + .build(); + public static final Option OPTION_DEFAULTS = Option.builder("d") .required(false) @@ -107,6 +118,7 @@ public class CliOptionsParser { + "functions, table sources, or sinks. Can be used multiple times.") .build(); + @Deprecated public static final Option OPTION_UPDATE = Option.builder("u") .required(false) @@ -114,11 +126,14 @@ public class CliOptionsParser { .numberOfArgs(1) .argName("SQL update statement") .desc( - "Experimental (for testing only!): Instructs the SQL Client to immediately execute " - + "the given update statement after starting up. The process is shut down after the " - + "statement has been submitted to the cluster and returns an appropriate return code. " - + "Currently, this feature is only supported for INSERT INTO statements that declare " - + "the target sink table.") + String.format( + "Deprecated Experimental (for testing only!) feature: Instructs the SQL Client to immediately execute " + + "the given update statement after starting up. The process is shut down after the " + + "statement has been submitted to the cluster and returns an appropriate return code. " + + "Currently, this feature is only supported for INSERT INTO statements that declare " + + "the target sink table." + + "Please use option -%s to submit update statement.", + OPTION_FILE.getOpt())) .build(); public static final Option OPTION_HISTORY = @@ -147,6 +162,7 @@ public class CliOptionsParser { buildGeneralOptions(options); options.addOption(OPTION_SESSION); options.addOption(OPTION_ENVIRONMENT); + options.addOption(OPTION_FILE); options.addOption(OPTION_DEFAULTS); options.addOption(OPTION_JAR); options.addOption(OPTION_LIBRARY); @@ -263,6 +279,7 @@ public class CliOptionsParser { checkSessionId(line), checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT), checkUrl(line, CliOptionsParser.OPTION_DEFAULTS), + checkUrl(line, CliOptionsParser.OPTION_FILE), checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()), @@ -282,6 +299,7 @@ public class CliOptionsParser { checkSessionId(line), checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT), null, + null, checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()), @@ -301,6 +319,7 @@ public class CliOptionsParser { null, null, checkUrl(line, CliOptionsParser.OPTION_DEFAULTS), + null, checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), null, diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStatementSplitter.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStatementSplitter.java new file mode 100644 index 0000000..5aea2a8 --- /dev/null +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStatementSplitter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import java.util.ArrayList; +import java.util.List; + +/** + * Line splitter to determine whether the submitted line is complete. It also offers to split the + * submitted content into multiple statements. + * + * <p>This is a simple splitter. It just split the line in context-unrelated way, e.g. it fails to + * parse line "';\n'" + */ +public class CliStatementSplitter { + + private static final String MASK = "--.*$"; + + public static boolean isStatementComplete(String statement) { + String[] lines = statement.split("\n"); + // fix input statement is "\n" + if (lines.length == 0) { + return false; + } else { + return isEndOfStatement(lines[lines.length - 1]); + } + } + + public static List<String> splitContent(String content) { + List<String> statements = new ArrayList<>(); + List<String> buffer = new ArrayList<>(); + + for (String line : content.split("\n")) { + if (isEndOfStatement(line)) { + buffer.add(line); + statements.add(String.join("\n", buffer)); + buffer.clear(); + } else { + buffer.add(line); + } + } + if (!buffer.isEmpty()) { + statements.add(String.join("\n", buffer)); + } + return statements; + } + + private static boolean isEndOfStatement(String line) { + return line.replaceAll(MASK, "").trim().endsWith(";"); + } +} diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java index d066650..5c7dc04 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java @@ -199,9 +199,7 @@ public final class CliStrings { public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:"; - public static final String MESSAGE_WILL_EXECUTE = "Executing the following statement:"; - - public static final String MESSAGE_UNSUPPORTED_SQL = "Unsupported SQL statement."; + public static final String MESSAGE_WILL_EXECUTE = "Executing the SQL from the file:"; public static final String MESSAGE_WAIT_EXECUTE = "Execute statement in sync mode. Please wait for the execution finish..."; diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java index ceb11ec..f85eb71 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java @@ -31,7 +31,6 @@ import java.util.List; */ public class SqlMultiLineParser extends DefaultParser { - private static final String EOF_CHARACTER = ";"; private static final String NEW_LINE_PROMPT = ""; // results in simple '>' output public SqlMultiLineParser() { @@ -41,7 +40,7 @@ public class SqlMultiLineParser extends DefaultParser { @Override public ParsedLine parse(String line, int cursor, ParseContext context) { - if (!line.trim().endsWith(EOF_CHARACTER) && context != ParseContext.COMPLETE) { + if (!CliStatementSplitter.isStatementComplete(line) && context != ParseContext.COMPLETE) { throw new EOFError(-1, -1, "New line without EOF character.", NEW_LINE_PROMPT); } final ArgumentList parsedLine = (ArgumentList) super.parse(line, cursor, context); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java index 7e1d754..8062a65 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -37,7 +37,12 @@ import java.io.InputStream; import java.io.PrintStream; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -66,6 +71,8 @@ public class SqlClientTest { private String historyPath; + private String sqlFilePath; + @Before public void before() throws IOException { originalEnv = System.getenv(); @@ -197,4 +204,32 @@ public class SqlClientTest { assertThat(output, containsString(error)); } } + + @Test + public void testExecuteSqlFile() throws IOException { + // create sql file + File sqlFileFolder = tempFolder.newFolder("sql-file"); + File sqlFile = new File(sqlFileFolder, "test-sql.sql"); + if (!sqlFile.createNewFile()) { + throw new IOException("Can't create testing test-sql.sql file."); + } + sqlFilePath = sqlFile.getPath(); + + List<String> statements = Collections.singletonList("HELP;"); + Files.write( + Paths.get(sqlFilePath), + statements, + StandardCharsets.UTF_8, + StandardOpenOption.APPEND); + + String[] args = new String[] {"-f", sqlFilePath}; + SqlClient.main(args); + final URL url = getClass().getClassLoader().getResource("sql-client-help-command.out"); + final String help = FileUtils.readFileUtf8(new File(url.getFile())); + final String output = getStdoutString(); + + for (String command : help.split("\n")) { + assertThat(output, containsString(command)); + } + } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java index a203202..427b3ee 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java @@ -38,6 +38,7 @@ import org.jline.terminal.Terminal; import org.jline.terminal.impl.DumbTerminal; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -79,6 +80,8 @@ public class CliClientITCase extends AbstractTestBase { @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TerminalStreamsResource useSystemStream = TerminalStreamsResource.INSTANCE; + @Parameterized.Parameter public String sqlPath; @Parameterized.Parameters(name = "{0}") diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index 2645e47..8bedeb9 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -51,7 +51,9 @@ import org.jline.reader.ParsedLine; import org.jline.reader.Parser; import org.jline.terminal.Terminal; import org.jline.terminal.impl.DumbTerminal; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import javax.annotation.Nullable; @@ -60,6 +62,8 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -68,9 +72,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.FutureTask; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; +import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -82,8 +86,10 @@ public class CliClientTest extends TestLogger { "INSERT INTO MyTable SELECT * FROM MyOtherTable"; private static final String INSERT_OVERWRITE_STATEMENT = "INSERT OVERWRITE MyTable SELECT * FROM MyOtherTable"; - private static final String SELECT_STATEMENT = "SELECT * FROM MyOtherTable"; - private static final Row SHOW_ROW = new Row(1); + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Rule public TerminalStreamsResource useSystemStream = TerminalStreamsResource.INSTANCE; @Test public void testUpdateSubmission() throws Exception { @@ -96,9 +102,18 @@ public class CliClientTest extends TestLogger { // fail at executor verifyUpdateSubmission(INSERT_INTO_STATEMENT, true, true); verifyUpdateSubmission(INSERT_OVERWRITE_STATEMENT, true, true); + } - // fail early in client - verifyUpdateSubmission(SELECT_STATEMENT, false, true); + @Test + public void testExecuteSqlFile() throws Exception { + MockExecutor executor = new MockExecutor(); + executeSqlFromContent( + executor, + String.join( + ";\n", + Arrays.asList( + INSERT_INTO_STATEMENT, "", INSERT_OVERWRITE_STATEMENT, "\n"))); + assertEquals(INSERT_OVERWRITE_STATEMENT, executor.receivedStatement); } @Test @@ -134,22 +149,116 @@ public class CliClientTest extends TestLogger { } @Test - public void verifyCancelSubmitInSyncMode() throws Exception { - final MockExecutor mockExecutor = new MockExecutor(true); + public void testGetEOFinNonInteractiveMode() throws Exception { + final List<String> statements = + Arrays.asList("DESC MyOtherTable;", "SHOW TABLES"); // meet EOF + String content = String.join("\n", statements); + + final MockExecutor mockExecutor = new MockExecutor(); + + executeSqlFromContent(mockExecutor, content); + // execute the last commands + assertTrue(statements.get(1).contains(mockExecutor.receivedStatement)); + } + + @Test + public void testUnknownStatementInNonInteractiveMode() throws Exception { + final List<String> statements = + Arrays.asList( + "ERT INTO MyOtherTable VALUES (1, 101), (2, 102);", + "DESC MyOtherTable;", + "SHOW TABLES;"); + String content = String.join("\n", statements); + + final MockExecutor mockExecutor = new MockExecutor(); + + executeSqlFromContent(mockExecutor, content); + // don't execute other commands + assertTrue(statements.get(0).contains(mockExecutor.receivedStatement)); + } + + @Test + public void testFailedExecutionInNonInteractiveMode() throws Exception { + final List<String> statements = + Arrays.asList( + "INSERT INTO MyOtherTable VALUES (1, 101), (2, 102);", + "DESC MyOtherTable;", + "SHOW TABLES;"); + String content = String.join("\n", statements); + + final MockExecutor mockExecutor = new MockExecutor(); + mockExecutor.failExecution = true; + + executeSqlFromContent(mockExecutor, content); + // don't execute other commands + assertTrue(statements.get(0).contains(mockExecutor.receivedStatement)); + } + + @Test + public void testIllegalResultModeInNonInteractiveMode() throws Exception { + // When client executes sql file, it requires sql-client.execution.result-mode = tableau; + // Therefore, it will get execution error and stop executing the sql follows the illegal + // statement. + final List<String> statements = + Arrays.asList( + "SELECT * FROM MyOtherTable;", + "HELP;", + "INSERT INTO MyOtherTable VALUES (1, 101), (2, 102);", + "DESC MyOtherTable;", + "SHOW TABLES;"); + + String content = String.join("\n", statements); + + final MockExecutor mockExecutor = new MockExecutor(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "In non-interactive mode, it only supports to use TABLEAU as value of " + + "sql-client.execution.result-mode when execute query. Please add " + + "'SET sql-client.execution.result-mode=TABLEAU;' in the sql file."); + executeSqlFromContent(mockExecutor, content); + } + + @Test + public void testCancelExecutionInNonInteractiveMode() throws Exception { + // add "\n" with quit to trigger commit the line + final List<String> statements = + Arrays.asList( + "HELP;", + "CREATE TABLE tbl( -- comment\n" + + "-- comment with ;\n" + + "id INT,\n" + + "name STRING\n" + + ") WITH (\n" + + " 'connector' = 'values'\n" + + ");\n", + "INSERT INTO \n" + + "--COMMENT ; \n" + + "MyOtherTable VALUES (1, 101), (2, 102);", + "DESC MyOtherTable;", + "SHOW TABLES;", + "QUIT;\n"); + + // use table.dml-sync to keep running + // therefore in non-interactive mode, the last executed command is INSERT INTO + final int hookIndex = 2; + + String content = String.join("\n", statements); + + final MockExecutor mockExecutor = new MockExecutor(); + mockExecutor.isSync = true; + String sessionId = mockExecutor.openSession("test-session"); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(256); - - try (CliClient client = - new CliClient( - TerminalUtils.createDummyTerminal(outputStream), - sessionId, - mockExecutor, - historyTempFile(), - null)) { - FutureTask<Boolean> task = - new FutureTask<>(() -> client.submitUpdate(INSERT_INTO_STATEMENT)); - Thread thread = new Thread(task); + Path historyFilePath = historyTempFile(); + + OutputStream outputStream = new ByteArrayOutputStream(256); + System.setOut(new PrintStream(outputStream)); + + try (Terminal terminal = TerminalUtils.createDummyTerminal(outputStream); + CliClient client = + new CliClient(terminal, sessionId, mockExecutor, historyFilePath, null)) { + Thread thread = new Thread(() -> client.executeSqlFile(content)); thread.start(); while (!mockExecutor.isAwait) { @@ -157,12 +266,18 @@ public class CliClientTest extends TestLogger { } thread.interrupt(); - assertFalse(task.get()); + + while (thread.isAlive()) { + Thread.sleep(10); + } assertTrue( outputStream .toString() .contains("java.lang.InterruptedException: sleep interrupted")); } + + // read the last executed statement + assertTrue(statements.get(hookIndex).contains(mockExecutor.receivedStatement)); } // -------------------------------------------------------------------------------------------- @@ -170,22 +285,15 @@ public class CliClientTest extends TestLogger { private void verifyUpdateSubmission( String statement, boolean failExecution, boolean testFailure) throws Exception { final MockExecutor mockExecutor = new MockExecutor(); - String sessionId = mockExecutor.openSession("test-session"); mockExecutor.failExecution = failExecution; - try (CliClient client = - new CliClient( - TerminalUtils.createDummyTerminal(), - sessionId, - mockExecutor, - historyTempFile(), - null)) { - if (testFailure) { - assertFalse(client.submitUpdate(statement)); - } else { - assertTrue(client.submitUpdate(statement)); - assertEquals(statement, mockExecutor.receivedStatement); - } + String result = executeSqlFromContent(mockExecutor, statement); + + if (testFailure) { + assertTrue(result.contains(MESSAGE_SQL_EXECUTION_ERROR)); + } else { + assertFalse(result.contains(MESSAGE_SQL_EXECUTION_ERROR)); + assertEquals(statement, mockExecutor.receivedStatement); } } @@ -218,6 +326,18 @@ public class CliClientTest extends TestLogger { return File.createTempFile("history", "tmp").toPath(); } + private String executeSqlFromContent(MockExecutor executor, String content) throws IOException { + String sessionId = executor.openSession("test-session"); + OutputStream outputStream = new ByteArrayOutputStream(256); + System.setOut(new PrintStream(outputStream)); + try (Terminal terminal = TerminalUtils.createDummyTerminal(outputStream); + CliClient client = + new CliClient(terminal, sessionId, executor, historyTempFile(), null)) { + client.executeSqlFile(content); + } + return outputStream.toString(); + } + // -------------------------------------------------------------------------------------------- private static class MockExecutor implements Executor { @@ -228,29 +348,22 @@ public class CliClientTest extends TestLogger { public boolean isAwait = false; public String receivedStatement; public int receivedPosition; - private Configuration configuration = new Configuration(); private final Map<String, SessionContext> sessionMap = new HashMap<>(); private final SqlParserHelper helper = new SqlParserHelper(); @Override public void start() throws SqlExecutionException {} - public MockExecutor() { - this(false); - } - - public MockExecutor(boolean isSync) { - configuration.set(TABLE_DML_SYNC, isSync); - this.isSync = isSync; - } - @Override public String openSession(@Nullable String sessionId) throws SqlExecutionException { + Configuration configuration = new Configuration(); + configuration.set(TABLE_DML_SYNC, isSync); + DefaultContext defaultContext = new DefaultContext( new Environment(), Collections.emptyList(), - new Configuration(), + configuration, Collections.singletonList(new DefaultCLI())); SessionContext context = SessionContext.create(defaultContext, sessionId); sessionMap.put(sessionId, context); @@ -293,7 +406,7 @@ public class CliClientTest extends TestLogger { if (failExecution) { throw new SqlExecutionException("Fail execution."); } - if (operation instanceof ModifyOperation || operation instanceof QueryOperation) { + if (operation instanceof ModifyOperation) { if (isSync) { isAwait = true; try { @@ -316,8 +429,12 @@ public class CliClientTest extends TestLogger { public Operation parseStatement(String sessionId, String statement) throws SqlExecutionException { receivedStatement = statement; - List<Operation> ops = helper.getSqlParser().parse(statement); - return ops.get(0); + + try { + return helper.getSqlParser().parse(statement).get(0); + } catch (Exception ex) { + throw new SqlExecutionException("Parse error: " + statement, ex); + } } @Override diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliStatementSplitterTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliStatementSplitterTest.java new file mode 100644 index 0000000..92c07bf --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliStatementSplitterTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Test {@link CliStatementSplitter}. */ +public class CliStatementSplitterTest { + + @Test + public void testIsNotEndOfStatement() { + // TODO: fix '; + List<String> lines = Arrays.asList(" --;", "", "select -- ok;", "\n"); + for (String line : lines) { + assertFalse( + String.format("%s is not end of statement but get true", line), + CliStatementSplitter.isStatementComplete(line)); + } + } + + @Test + public void testIsEndOfStatement() { + List<String> lines = Arrays.asList(" ; --;", "select a from b;-- ok;", ";\n"); + for (String line : lines) { + assertTrue( + String.format("%s is end of statement but get false", line), + CliStatementSplitter.isStatementComplete(line)); + } + } + + @Test + public void testSplitContent() { + List<String> lines = + Arrays.asList( + "CREATE TABLE MyTable (\n" + + " id INT,\n" + + " name STRING,\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'test-property' = 'test.value'\n);", + "SET a = b;", + "\n" + "SELECT func(id) from MyTable\n;"); + List<String> actual = CliStatementSplitter.splitContent(String.join("\n", lines)); + + for (int i = 0; i < lines.size(); i++) { + assertEquals(lines.get(i), actual.get(i)); + } + } +} diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java index 85a5849..c2d2fcd 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TerminalStreamsResource.java @@ -20,6 +20,8 @@ package org.apache.flink.table.client.cli; import org.junit.rules.ExternalResource; +import java.io.PrintStream; + /** * Enables {@link org.apache.flink.table.client.SqlClient} to create a default terminal using {@link * System#in} and {@link System#out} as the input and output stream. This can allows tests to easily @@ -28,6 +30,7 @@ import org.junit.rules.ExternalResource; public class TerminalStreamsResource extends ExternalResource { public static final TerminalStreamsResource INSTANCE = new TerminalStreamsResource(); + public static final PrintStream STD_OUT = System.out; private TerminalStreamsResource() { // singleton @@ -41,5 +44,7 @@ public class TerminalStreamsResource extends ExternalResource { @Override protected void after() { CliClient.useSystemInOutStream = false; + // Reset the System.out + System.setOut(STD_OUT); } } diff --git a/flink-table/flink-sql-client/src/test/resources/sql-client-help-command.out b/flink-table/flink-sql-client/src/test/resources/sql-client-help-command.out new file mode 100644 index 0000000..8d8bf88 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/resources/sql-client-help-command.out @@ -0,0 +1,24 @@ +CLEAR Clears the current terminal. +CREATE TABLE Create table under current catalog and database. +DROP TABLE Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;' +CREATE VIEW Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;' +DESCRIBE Describes the schema of a table with the given name. +DROP VIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;' +EXPLAIN Describes the execution plan of a query or table with the given name. +HELP Prints the available commands. +INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink. +INSERT OVERWRITE Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data. +QUIT Quits the SQL CLI client. +RESET Resets a session configuration property. Syntax: 'RESET <key>;'. Use 'RESET;' for reset all session properties. +SELECT Executes a SQL SELECT query on the Flink cluster. +SET Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties. +SHOW FUNCTIONS Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;' +SHOW TABLES Shows all registered tables. +SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster. +USE CATALOG Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;' +USE Sets the current default database. Experimental! Syntax: 'USE <name>;' +LOAD MODULE Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = '<value1>' [, '<key2>' = '<value2>', ...])];' +UNLOAD MODULE Unload a module. Syntax: 'UNLOAD MODULE <name>;' +USE MODULES Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];' + +Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements. diff --git a/flink-table/flink-sql-client/src/test/resources/sql-client-help.out b/flink-table/flink-sql-client/src/test/resources/sql-client-help.out index fe9173f..7b40fc9 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql-client-help.out +++ b/flink-table/flink-sql-client/src/test/resources/sql-client-help.out @@ -14,6 +14,9 @@ Mode "embedded" (default) submits Flink jobs from the local machine. imported into the session. It might overwrite default environment properties. + -f,--file <script file> Script file that should be executed. + In this mode, the client will not + open an interactive terminal. -h,--help Show the help message with descriptions of all options. -hist,--history <History file path> The file which you want to save the @@ -101,16 +104,17 @@ Mode "embedded" (default) submits Flink jobs from the local machine. tmp/cached_dir). -s,--session <session identifier> The identifier for a session. 'default' is the default identifier. - -u,--update <SQL update statement> Experimental (for testing only!): - Instructs the SQL Client to - immediately execute the given update - statement after starting up. The - process is shut down after the - statement has been submitted to the - cluster and returns an appropriate - return code. Currently, this feature - is only supported for INSERT INTO - statements that declare the target - sink table. + -u,--update <SQL update statement> Deprecated Experimental (for testing + only!) feature: Instructs the SQL + Client to immediately execute the + given update statement after starting + up. The process is shut down after + the statement has been submitted to + the cluster and returns an + appropriate return code. Currently, + this feature is only supported for + INSERT INTO statements that declare + the target sink table.Please use + option -f to submit update statement.