[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594663#comment-16594663
 ] 

ASF GitHub Bot commented on FLINK-10163:
----------------------------------------

asfgit closed pull request #6606: [FLINK-10163] [sql-client] Support views in 
SQL Client
URL: https://github.com/apache/flink/pull/6606
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 0e8d2d651b1..f16f1a5561a 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -162,7 +162,7 @@ A SQL query needs a configuration environment in which it 
is executed. The so-ca
 Every environment file is a regular [YAML file](http://yaml.org/). An example 
of such a file is presented below.
 
 {% highlight yaml %}
-# Define table sources here.
+# Define table sources and sinks here.
 
 tables:
   - name: MyTableSource
@@ -185,6 +185,12 @@ tables:
       - name: MyField2
         type: VARCHAR
 
+# Define table views here.
+
+views:
+  - name: MyCustomView
+    query: "SELECT MyField2 FROM MyTableSource"
+
 # Define user-defined functions here.
 
 functions:
@@ -217,7 +223,8 @@ deployment:
 
 This configuration:
 
-- defines an environment with a table source `MyTableName` that reads from a 
CSV file,
+- defines an environment with a table source `MyTableSource` that reads from a 
CSV file,
+- defines a view `MyCustomView` that declares a virtual table using a SQL 
query,
 - defines a user-defined function `myUDF` that can be instantiated using the 
class name and two constructor parameters,
 - specifies a parallelism of 1 for queries executed in this streaming 
environment,
 - specifies an event-time characteristic, and
@@ -404,7 +411,7 @@ This process can be recursively performed until all the 
constructor parameters a
 {% top %}
 
 Detached SQL Queries
-------------------------
+--------------------
 
 In order to define end-to-end SQL pipelines, SQL's `INSERT INTO` statement can 
be used for submitting long-running, detached queries to a Flink cluster. These 
queries produce their results into an external system instead of the SQL 
Client. This allows for dealing with higher parallelism and larger amounts of 
data. The CLI itself does not have any control over a detached query after 
submission.
 
@@ -459,6 +466,44 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+---------
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+    query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+    query: >
+      SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+      FROM MyTableSource
+      WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+Views created within a CLI session can also be removed again using the `DROP 
VIEW` statement:
+
+{% highlight text %}
+DROP VIEW MyNewView
+{% endhighlight %}
+
+<span class="label label-danger">Attention</span> The definition of views is 
limited to the mentioned syntax above. Defining a schema for views or escape 
whitespaces in table names will be supported in future versions.
+
+{% top %}
+
 Limitations & Future
 --------------------
 
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 8be4ce63ecb..302651a78aa 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -25,7 +25,7 @@
 
 
 #==============================================================================
-# Table Sources
+# Table Sources & Sinks
 #==============================================================================
 
 # Define table sources and sinks here.
@@ -38,6 +38,17 @@ tables: [] # empty list
 #   format: ...
 #   schema: ...
 
+#==============================================================================
+# Table Views
+#==============================================================================
+
+# Define frequently used SQL queries here that define a virtual table.
+
+views: [] # empty list
+# A typical view definition looks like:
+# - name: ...
+#   query: "SELECT ..."
+
 #==============================================================================
 # User-defined functions
 #==============================================================================
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 3efaa6a3352..b9aca75d325 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.SessionContext;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.local.LocalExecutor;
 
 import org.slf4j.Logger;
@@ -46,7 +47,7 @@
  * and allows for managing queries via console.
  *
  * <p>For debugging in an IDE you can execute the main method of this class 
using:
- * "embedded --defaults /path/to/-sql-client-defaults.yaml --jar 
/path/to/target/flink-sql-client-*.jar"
+ * "embedded --defaults /path/to/sql-client-defaults.yaml --jar 
/path/to/target/flink-sql-client-*.jar"
  *
  * <p>Make sure that the FLINK_CONF_DIR environment variable is set.
  */
@@ -94,6 +95,9 @@ private void start() {
                                context = new 
SessionContext(options.getSessionId(), sessionEnv);
                        }
 
+                       // validate the environment (defaults and session)
+                       validateEnvironment(context, executor);
+
                        // add shutdown hook
                        Runtime.getRuntime().addShutdownHook(new 
EmbeddedShutdownThread(context, executor));
 
@@ -127,6 +131,17 @@ private void openCli(SessionContext context, Executor 
executor) {
 
        // 
--------------------------------------------------------------------------------------------
 
+       private static void validateEnvironment(SessionContext context, 
Executor executor) {
+               System.out.print("Validating current environment...");
+               try {
+                       executor.validateSession(context);
+                       System.out.println("done.");
+               } catch (SqlExecutionException e) {
+                       throw new SqlClientException(
+                               "Current environment is invalid. Please check 
your configuration files again.", e);
+               }
+       }
+
        private static void shutdown(SessionContext context, Executor executor) 
{
                System.out.println();
                System.out.print("Shutting down executor...");
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 576f5189c0b..8030c7c75da 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -203,8 +203,7 @@ public boolean submitUpdate(String statement) {
                                case INSERT_INTO:
                                        return callInsertInto(cmdCall);
                                default:
-                                       
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNSUPPORTED_SQL).toAnsi());
-                                       terminal.flush();
+                                       
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
                                        return false;
                        }
                }).orElse(false);
@@ -215,8 +214,7 @@ public boolean submitUpdate(String statement) {
        private Optional<SqlCommandCall> parseCommand(String line) {
                final Optional<SqlCommandCall> parsedLine = 
SqlCommandParser.parse(line);
                if (!parsedLine.isPresent()) {
-                       
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL).toAnsi());
-                       terminal.flush();
+                       printError(CliStrings.MESSAGE_UNKNOWN_SQL);
                }
                return parsedLine;
        }
@@ -224,26 +222,25 @@ public boolean submitUpdate(String statement) {
        private void callCommand(SqlCommandCall cmdCall) {
                switch (cmdCall.command) {
                        case QUIT:
-                       case EXIT:
-                               callQuit(cmdCall);
+                               callQuit();
                                break;
                        case CLEAR:
-                               callClear(cmdCall);
+                               callClear();
                                break;
                        case RESET:
-                               callReset(cmdCall);
+                               callReset();
                                break;
                        case SET:
                                callSet(cmdCall);
                                break;
                        case HELP:
-                               callHelp(cmdCall);
+                               callHelp();
                                break;
                        case SHOW_TABLES:
-                               callShowTables(cmdCall);
+                               callShowTables();
                                break;
                        case SHOW_FUNCTIONS:
-                               callShowFunctions(cmdCall);
+                               callShowFunctions();
                                break;
                        case DESCRIBE:
                                callDescribe(cmdCall);
@@ -257,6 +254,12 @@ private void callCommand(SqlCommandCall cmdCall) {
                        case INSERT_INTO:
                                callInsertInto(cmdCall);
                                break;
+                       case CREATE_VIEW:
+                               callCreateView(cmdCall);
+                               break;
+                       case DROP_VIEW:
+                               callDropView(cmdCall);
+                               break;
                        case SOURCE:
                                callSource(cmdCall);
                                break;
@@ -265,19 +268,18 @@ private void callCommand(SqlCommandCall cmdCall) {
                }
        }
 
-       private void callQuit(SqlCommandCall cmdCall) {
-               terminal.writer().println(CliStrings.MESSAGE_QUIT);
-               terminal.flush();
+       private void callQuit() {
+               printInfo(CliStrings.MESSAGE_QUIT);
                isRunning = false;
        }
 
-       private void callClear(SqlCommandCall cmdCall) {
+       private void callClear() {
                clearTerminal();
        }
 
-       private void callReset(SqlCommandCall cmdCall) {
+       private void callReset() {
                context.resetSessionProperties();
-               
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESET).toAnsi());
+               printInfo(CliStrings.MESSAGE_RESET);
        }
 
        private void callSet(SqlCommandCall cmdCall) {
@@ -287,7 +289,7 @@ private void callSet(SqlCommandCall cmdCall) {
                        try {
                                properties = 
executor.getSessionProperties(context);
                        } catch (SqlExecutionException e) {
-                               printException(e);
+                               printExecutionException(e);
                                return;
                        }
                        if (properties.isEmpty()) {
@@ -309,17 +311,17 @@ private void callSet(SqlCommandCall cmdCall) {
                terminal.flush();
        }
 
-       private void callHelp(SqlCommandCall cmdCall) {
+       private void callHelp() {
                terminal.writer().println(CliStrings.MESSAGE_HELP);
                terminal.flush();
        }
 
-       private void callShowTables(SqlCommandCall cmdCall) {
+       private void callShowTables() {
                final List<String> tables;
                try {
                        tables = executor.listTables(context);
                } catch (SqlExecutionException e) {
-                       printException(e);
+                       printExecutionException(e);
                        return;
                }
                if (tables.isEmpty()) {
@@ -330,12 +332,12 @@ private void callShowTables(SqlCommandCall cmdCall) {
                terminal.flush();
        }
 
-       private void callShowFunctions(SqlCommandCall cmdCall) {
+       private void callShowFunctions() {
                final List<String> functions;
                try {
                        functions = executor.listUserDefinedFunctions(context);
                } catch (SqlExecutionException e) {
-                       printException(e);
+                       printExecutionException(e);
                        return;
                }
                if (functions.isEmpty()) {
@@ -351,7 +353,7 @@ private void callDescribe(SqlCommandCall cmdCall) {
                try {
                        schema = executor.getTableSchema(context, 
cmdCall.operands[0]);
                } catch (SqlExecutionException e) {
-                       printException(e);
+                       printExecutionException(e);
                        return;
                }
                terminal.writer().println(schema.toString());
@@ -363,7 +365,7 @@ private void callExplain(SqlCommandCall cmdCall) {
                try {
                        explanation = executor.explainStatement(context, 
cmdCall.operands[0]);
                } catch (SqlExecutionException e) {
-                       printException(e);
+                       printExecutionException(e);
                        return;
                }
                terminal.writer().println(explanation);
@@ -375,7 +377,7 @@ private void callSelect(SqlCommandCall cmdCall) {
                try {
                        resultDesc = executor.executeQuery(context, 
cmdCall.operands[0]);
                } catch (SqlExecutionException e) {
-                       printException(e);
+                       printExecutionException(e);
                        return;
                }
                final CliResultView view;
@@ -390,16 +392,14 @@ private void callSelect(SqlCommandCall cmdCall) {
                        view.open();
 
                        // view left
-                       
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESULT_QUIT).toAnsi());
-                       terminal.flush();
+                       printInfo(CliStrings.MESSAGE_RESULT_QUIT);
                } catch (SqlExecutionException e) {
-                       printException(e);
+                       printExecutionException(e);
                }
        }
 
        private boolean callInsertInto(SqlCommandCall cmdCall) {
-               
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
-               terminal.flush();
+               printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
 
                try {
                        final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);
@@ -407,20 +407,58 @@ private boolean callInsertInto(SqlCommandCall cmdCall) {
                        terminal.writer().println(programTarget.toString());
                        terminal.flush();
                } catch (SqlExecutionException e) {
-                       printException(e);
+                       printExecutionException(e);
                        return false;
                }
                return true;
        }
 
-       private void callSource(SqlCommandCall cmdCall) {
-               final String pathString = cmdCall.operands[0];
+       private void callCreateView(SqlCommandCall cmdCall) {
+               final String name = cmdCall.operands[0];
+               final String query = cmdCall.operands[1];
 
-               if (pathString.isEmpty()) {
-                       printError(CliStrings.MESSAGE_INVALID_PATH);
+               final String previousQuery = context.getViews().get(name);
+               if (previousQuery != null) {
+                       
printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS);
                        return;
                }
 
+               try {
+                       // perform and validate change
+                       context.addView(name, query);
+                       executor.validateSession(context);
+                       printInfo(CliStrings.MESSAGE_VIEW_CREATED);
+               } catch (SqlExecutionException e) {
+                       // rollback change
+                       context.removeView(name);
+                       printExecutionException(e);
+               }
+       }
+
+       private void callDropView(SqlCommandCall cmdCall) {
+               final String name = cmdCall.operands[0];
+               final String query = context.getViews().get(name);
+
+               if (query == null) {
+                       printExecutionError(CliStrings.MESSAGE_VIEW_NOT_FOUND);
+                       return;
+               }
+
+               try {
+                       // perform and validate change
+                       context.removeView(name);
+                       executor.validateSession(context);
+                       printInfo(CliStrings.MESSAGE_VIEW_REMOVED);
+               } catch (SqlExecutionException e) {
+                       // rollback change
+                       context.addView(name, query);
+                       
printExecutionException(CliStrings.MESSAGE_VIEW_NOT_REMOVED, e);
+               }
+       }
+
+       private void callSource(SqlCommandCall cmdCall) {
+               final String pathString = cmdCall.operands[0];
+
                // load file
                final String stmt;
                try {
@@ -428,13 +466,13 @@ private void callSource(SqlCommandCall cmdCall) {
                        byte[] encoded = Files.readAllBytes(path);
                        stmt = new String(encoded, Charset.defaultCharset());
                } catch (IOException e) {
-                       printException(e);
+                       printExecutionException(e);
                        return;
                }
 
                // limit the output a bit
                if (stmt.length() > SOURCE_MAX_SIZE) {
-                       printError(CliStrings.MESSAGE_MAX_SIZE_EXCEEDED);
+                       
printExecutionError(CliStrings.MESSAGE_MAX_SIZE_EXCEEDED);
                        return;
                }
 
@@ -449,14 +487,38 @@ private void callSource(SqlCommandCall cmdCall) {
 
        // 
--------------------------------------------------------------------------------------------
 
-       private void printException(Throwable t) {
-               LOG.warn(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, t);
-               
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR,
 t).toAnsi());
+       private void printExecutionException(Throwable t) {
+               printExecutionException(null, t);
+       }
+
+       private void printExecutionException(String message, Throwable t) {
+               final String finalMessage;
+               if (message == null) {
+                       finalMessage = CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
+               } else {
+                       finalMessage = CliStrings.MESSAGE_SQL_EXECUTION_ERROR + 
' ' + message;
+               }
+               printException(finalMessage, t);
+       }
+
+       private void printExecutionError(String message) {
+               
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR,
 message).toAnsi());
+               terminal.flush();
+       }
+
+       private void printException(String message, Throwable t) {
+               LOG.warn(message, t);
+               terminal.writer().println(CliStrings.messageError(message, 
t).toAnsi());
+               terminal.flush();
+       }
+
+       private void printError(String message) {
+               
terminal.writer().println(CliStrings.messageError(message).toAnsi());
                terminal.flush();
        }
 
-       private void printError(String e) {
-               
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR,
 e).toAnsi());
+       private void printInfo(String message) {
+               
terminal.writer().println(CliStrings.messageInfo(message).toAnsi());
                terminal.flush();
        }
 }
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index aef76698449..42c5f715e58 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -50,6 +50,8 @@ private CliStrings() {
                .append(formatCommand(SqlCommand.EXPLAIN, "Describes the 
execution plan of a query or table with the given name."))
                .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))
                .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the 
results of a SQL SELECT query into a declared table sink."))
+               .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a 
virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>'"))
+               .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a 
previously created virtual table. Syntax: 'DROP VIEW <name>'"))
                .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT 
query from a file and executes it on the Flink cluster."))
                .append(formatCommand(SqlCommand.SET, "Sets a session 
configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all 
properties."))
                .append(formatCommand(SqlCommand.RESET, "Resets all session 
configuration properties."))
@@ -115,7 +117,7 @@ private CliStrings() {
 
        public static final String MESSAGE_EMPTY = "Result was empty.";
 
-       public static final String MESSAGE_UNKNOWN_SQL = "Unknown SQL 
statement.";
+       public static final String MESSAGE_UNKNOWN_SQL = "Unknown or invalid 
SQL statement.";
 
        public static final String MESSAGE_UNKNOWN_TABLE = "Unknown table.";
 
@@ -127,14 +129,25 @@ private CliStrings() {
 
        public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update 
statement has been successfully submitted to the cluster:";
 
-       public static final String MESSAGE_INVALID_PATH = "Path is invalid.";
-
        public static final String MESSAGE_MAX_SIZE_EXCEEDED = "The given file 
exceeds the maximum number of characters.";
 
        public static final String MESSAGE_WILL_EXECUTE = "Executing the 
following statement:";
 
        public static final String MESSAGE_UNSUPPORTED_SQL = "Unsupported SQL 
statement.";
 
+       public static final String MESSAGE_VIEW_CREATED = "View has been 
created.";
+
+       public static final String MESSAGE_VIEW_REMOVED = "View has been 
removed.";
+
+       public static final String MESSAGE_VIEW_ALREADY_EXISTS = "A view with 
this name has already been defined in the current CLI session.";
+
+       public static final String MESSAGE_VIEW_NOT_FOUND = "The given view 
does not exist in the current CLI session. " +
+               "Only views created with a CREATE VIEW statement can be 
accessed.";
+
+       public static final String MESSAGE_VIEW_NOT_REMOVED = "The given view 
cannot be removed without affecting other views.";
+
+       public static final String MESSAGE_ENVIRONMENT_INVALID = "The 
configured environment is invalid. Please check your environment files again.";
+
        // 
--------------------------------------------------------------------------------------------
 
        public static final String RESULT_TITLE = "SQL Query Result";
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 376b1f1558e..83fbabc0931 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -18,7 +18,12 @@
 
 package org.apache.flink.table.client.cli;
 
+import java.util.Arrays;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Simple parser for determining the type of command and its parameters.
@@ -30,86 +35,121 @@ private SqlCommandParser() {
        }
 
        public static Optional<SqlCommandCall> parse(String stmt) {
-               String trimmed = stmt.trim();
+               // normalize
+               stmt = stmt.trim();
                // remove ';' at the end because many people type it intuitively
-               if (trimmed.endsWith(";")) {
-                       trimmed = trimmed.substring(0, trimmed.length() - 1);
+               if (stmt.endsWith(";")) {
+                       stmt = stmt.substring(0, stmt.length() - 1).trim();
                }
+
+               // parse
                for (SqlCommand cmd : SqlCommand.values()) {
-                       int pos = 0;
-                       int tokenCount = 0;
-                       for (String token : trimmed.split("\\s")) {
-                               pos += token.length() + 1; // include space 
character
-                               // check for content
-                               if (token.length() > 0) {
-                                       // match
-                                       if (tokenCount < cmd.tokens.length && 
token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
-                                               if (tokenCount == 
cmd.tokens.length - 1) {
-                                                       final SqlCommandCall 
call = new SqlCommandCall(
-                                                               cmd,
-                                                               
splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length())))
-                                                       );
-                                                       return 
Optional.of(call);
-                                               }
-                                       } else {
-                                               // next sql command
-                                               break;
-                                       }
-                                       tokenCount++; // check next token
+                       final Matcher matcher = cmd.pattern.matcher(stmt);
+                       if (matcher.matches()) {
+                               final String[] groups = new 
String[matcher.groupCount()];
+                               for (int i = 0; i < groups.length; i++) {
+                                       groups[i] = matcher.group(i + 1);
                                }
+                               return cmd.operandConverter.apply(groups)
+                                       .map((operands) -> new 
SqlCommandCall(cmd, operands));
                        }
                }
                return Optional.empty();
        }
 
-       private static String[] splitOperands(SqlCommand cmd, String 
originalCall, String operands) {
-               switch (cmd) {
-                       case SET:
-                               final int delimiter = operands.indexOf('=');
-                               if (delimiter < 0) {
-                                       return new String[] {};
-                               } else {
-                                       return new String[] 
{operands.substring(0, delimiter), operands.substring(delimiter + 1)};
-                               }
-                       case SELECT:
-                       case INSERT_INTO:
-                               return new String[] {originalCall};
-                       default:
-                               return new String[] {operands};
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
 
+       private static final Function<String[], Optional<String[]>> NO_OPERANDS 
=
+               (operands) -> Optional.of(new String[0]);
+
+       private static final Function<String[], Optional<String[]>> 
SINGLE_OPERAND =
+               (operands) -> Optional.of(new String[]{operands[0]});
+
+       private static final int DEFAULT_PATTERN_FLAGS = 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
+
        /**
         * Supported SQL commands.
         */
        enum SqlCommand {
-               QUIT("quit"),
-               EXIT("exit"),
-               CLEAR("clear"),
-               HELP("help"),
-               SHOW_TABLES("show tables"),
-               SHOW_FUNCTIONS("show functions"),
-               DESCRIBE("describe"),
-               EXPLAIN("explain"),
-               SELECT("select"),
-               INSERT_INTO("insert into"),
-               SET("set"),
-               RESET("reset"),
-               SOURCE("source");
-
-               public final String command;
-               public final String[] tokens;
-
-               SqlCommand(String command) {
-                       this.command = command;
-                       this.tokens = command.split(" ");
+               QUIT(
+                       "(QUIT|EXIT)",
+                       NO_OPERANDS),
+
+               CLEAR(
+                       "CLEAR",
+                       NO_OPERANDS),
+
+               HELP(
+                       "HELP",
+                       NO_OPERANDS),
+
+               SHOW_TABLES(
+                       "SHOW\\s+TABLES",
+                       NO_OPERANDS),
+
+               SHOW_FUNCTIONS(
+                       "SHOW\\s+FUNCTIONS",
+                       NO_OPERANDS),
+
+               DESCRIBE(
+                       "DESCRIBE\\s+(.*)",
+                       SINGLE_OPERAND),
+
+               EXPLAIN(
+                       "EXPLAIN\\s+(.*)",
+                       SINGLE_OPERAND),
+
+               SELECT(
+                       "(SELECT.*)",
+                       SINGLE_OPERAND),
+
+               INSERT_INTO(
+                       "(INSERT\\s+INTO.*)",
+                       SINGLE_OPERAND),
+
+               CREATE_VIEW(
+                       "CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)",
+                       (operands) -> {
+                               if (operands.length < 2) {
+                                       return Optional.empty();
+                               }
+                               return Optional.of(new String[]{operands[0], 
operands[1]});
+                       }),
+
+               DROP_VIEW(
+                       "DROP\\s+VIEW\\s+(.*)",
+                       SINGLE_OPERAND),
+
+               SET(
+                       "SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only 
ignored on the left side of '='
+                       (operands) -> {
+                               if (operands.length < 3) {
+                                       return Optional.empty();
+                               } else if (operands[0] == null) {
+                                       return Optional.of(new String[0]);
+                               }
+                               return Optional.of(new String[]{operands[1], 
operands[2]});
+                       }),
+
+               RESET(
+                       "RESET",
+                       NO_OPERANDS),
+
+               SOURCE(
+                       "SOURCE\\s+(.*)",
+                       SINGLE_OPERAND);
+
+               public final Pattern pattern;
+               public final Function<String[], Optional<String[]>> 
operandConverter;
+
+               SqlCommand(String matchingRegex, Function<String[], 
Optional<String[]>> operandConverter) {
+                       this.pattern = Pattern.compile(matchingRegex, 
DEFAULT_PATTERN_FLAGS);
+                       this.operandConverter = operandConverter;
                }
 
                @Override
                public String toString() {
-                       return command.toUpperCase();
+                       return super.toString().replace('_', ' ');
                }
        }
 
@@ -124,5 +164,33 @@ public SqlCommandCall(SqlCommand command, String[] 
operands) {
                        this.command = command;
                        this.operands = operands;
                }
+
+               public SqlCommandCall(SqlCommand command) {
+                       this(command, new String[0]);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       SqlCommandCall that = (SqlCommandCall) o;
+                       return command == that.command && 
Arrays.equals(operands, that.operands);
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = Objects.hash(command);
+                       result = 31 * result + Arrays.hashCode(operands);
+                       return result;
+               }
+
+               @Override
+               public String toString() {
+                       return command + "(" + Arrays.toString(operands) + ")";
+               }
        }
 }
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
index 337d8035915..31cf946960d 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.config;
 
+import org.apache.flink.table.client.SqlClientException;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.IOContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -40,6 +42,20 @@ private ConfigUtil() {
                // private
        }
 
+       /**
+        * Extracts an early string property from YAML (before normalization).
+        */
+       public static String extractEarlyStringProperty(Map<String, Object> 
map, String key, String parent) {
+               if (!map.containsKey(key)) {
+                       throw new SqlClientException("The '" + key + "' 
attribute of " + parent + " is missing.");
+               }
+               final Object object = map.get(key);
+               if (object == null || !(object instanceof String) || ((String) 
object).trim().length() <= 0) {
+                       throw new SqlClientException("Invalid " + parent + " " 
+ key + " '" + object + "'.");
+               }
+               return (String) object;
+       }
+
        /**
         * Normalizes key-value properties from Yaml in the normalized format 
of the Table API.
         */
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 0853afbb45e..b0d1528a4c8 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -26,9 +26,12 @@
 import java.net.URL;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.table.client.config.ConfigUtil.extractEarlyStringProperty;
+
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
  * define tables, execution, and deployment behavior. An environment might be 
defined by default or
@@ -41,6 +44,8 @@
 
        private Map<String, TableDescriptor> tables;
 
+       private Map<String, String> views;
+
        private Map<String, UserDefinedFunction> functions;
 
        private Execution execution;
@@ -52,9 +57,12 @@
        private static final String TABLE_TYPE_VALUE_SOURCE = "source";
        private static final String TABLE_TYPE_VALUE_SINK = "sink";
        private static final String TABLE_TYPE_VALUE_BOTH = "both";
+       private static final String VIEW_NAME = "name";
+       private static final String VIEW_QUERY = "query";
 
        public Environment() {
                this.tables = Collections.emptyMap();
+               this.views = Collections.emptyMap();
                this.functions = Collections.emptyMap();
                this.execution = new Execution();
                this.deployment = new Deployment();
@@ -67,24 +75,38 @@ public Environment() {
        public void setTables(List<Map<String, Object>> tables) {
                this.tables = new HashMap<>(tables.size());
                tables.forEach(config -> {
-                       if (!config.containsKey(TABLE_NAME)) {
-                               throw new SqlClientException("The 'name' 
attribute of a table is missing.");
-                       }
-                       final Object nameObject = config.get(TABLE_NAME);
-                       if (nameObject == null || !(nameObject instanceof 
String) || ((String) nameObject).length() <= 0) {
-                               throw new SqlClientException("Invalid table 
name '" + nameObject + "'.");
-                       }
-                       final String name = (String) nameObject;
+                       final String name = extractEarlyStringProperty(config, 
TABLE_NAME, "table");
                        final Map<String, Object> properties = new 
HashMap<>(config);
                        properties.remove(TABLE_NAME);
 
-                       if (this.tables.containsKey(name)) {
-                               throw new SqlClientException("Duplicate table 
name '" + name + "'.");
+                       if (this.tables.containsKey(name) || 
this.views.containsKey(name)) {
+                               throw new SqlClientException("Cannot create 
table '" + name + "' because a table or " +
+                                       "view with this name is already 
registered.");
                        }
                        this.tables.put(name, createTableDescriptor(name, 
properties));
                });
        }
 
+       public Map<String, String> getViews() {
+               return views;
+       }
+
+       public void setViews(List<Map<String, Object>> views) {
+               // the order of how views are registered matters because
+               // they might reference each other
+               this.views = new LinkedHashMap<>(views.size());
+               views.forEach(config -> {
+                       final String name = extractEarlyStringProperty(config, 
VIEW_NAME, "view");
+                       final String query = extractEarlyStringProperty(config, 
VIEW_QUERY, "view");
+
+                       if (this.tables.containsKey(name) || 
this.views.containsKey(name)) {
+                               throw new SqlClientException("Cannot create 
view '" + name + "' because a table or " +
+                                       "view with this name is already 
registered.");
+                       }
+                       this.views.put(name, query);
+               });
+       }
+
        public Map<String, UserDefinedFunction> getFunctions() {
                return functions;
        }
@@ -126,6 +148,11 @@ public String toString() {
                        table.addProperties(props);
                        props.asMap().forEach((k, v) -> sb.append("  
").append(k).append(": ").append(v).append('\n'));
                });
+               sb.append("===================== Views 
=====================\n");
+               views.forEach((name, query) -> {
+                       sb.append("- name: ").append(name).append("\n");
+                       sb.append("  ").append(VIEW_QUERY).append(": 
").append(query).append('\n');
+               });
                sb.append("=================== Functions 
====================\n");
                functions.forEach((name, function) -> {
                        sb.append("- name: ").append(name).append("\n");
@@ -167,6 +194,11 @@ public static Environment merge(Environment env1, 
Environment env2) {
                tables.putAll(env2.getTables());
                mergedEnv.tables = tables;
 
+               // merge views
+               final LinkedHashMap<String, String> views = new 
LinkedHashMap<>(env1.getViews());
+               views.putAll(env2.getViews());
+               mergedEnv.views = views;
+
                // merge functions
                final Map<String, UserDefinedFunction> functions = new 
HashMap<>(env1.getFunctions());
                functions.putAll(env2.getFunctions());
@@ -182,9 +214,12 @@ public static Environment merge(Environment env1, 
Environment env2) {
        }
 
        /**
-        * Enriches an environment with new/modified properties and returns the 
new instance.
+        * Enriches an environment with new/modified properties or views and 
returns the new instance.
         */
-       public static Environment enrich(Environment env, Map<String, String> 
properties) {
+       public static Environment enrich(
+                       Environment env,
+                       Map<String, String> properties,
+                       Map<String, String> views) {
                final Environment enrichedEnv = new Environment();
 
                // merge tables
@@ -199,6 +234,10 @@ public static Environment enrich(Environment env, 
Map<String, String> properties
                // enrich deployment properties
                enrichedEnv.deployment = Deployment.enrich(env.deployment, 
properties);
 
+               // enrich views
+               enrichedEnv.views = new LinkedHashMap<>(env.getViews());
+               enrichedEnv.views.putAll(views);
+
                return enrichedEnv;
        }
 
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
index 8652d40b484..8a8c1a1d695 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.table.client.config;
 
-import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.FunctionDescriptor;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.flink.table.client.config.ConfigUtil.extractEarlyStringProperty;
+
 /**
  * Descriptor for user-defined functions.
  */
@@ -33,7 +34,7 @@
        private String name;
        private Map<String, String> properties;
 
-       private static final String NAME = "name";
+       private static final String FUNCTION_NAME = "name";
 
        private UserDefinedFunction(String name, Map<String, String> 
properties) {
                this.name = name;
@@ -52,16 +53,10 @@ public String getName() {
         * Creates a user-defined function descriptor with the given config.
         */
        public static UserDefinedFunction create(Map<String, Object> config) {
-               if (!config.containsKey(NAME)) {
-                       throw new SqlClientException("The 'name' attribute of a 
function is missing.");
-               }
-               final Object name = config.get(NAME);
-               if (name == null || !(name instanceof String) || ((String) 
name).trim().length() <= 0) {
-                       throw new SqlClientException("Invalid function name '" 
+ name + "'.");
-               }
+               final String name = extractEarlyStringProperty(config, 
FUNCTION_NAME, "function");
                final Map<String, Object> properties = new HashMap<>(config);
-               properties.remove(NAME);
-               return new UserDefinedFunction((String) name, 
ConfigUtil.normalizeYaml(properties));
+               properties.remove(FUNCTION_NAME);
+               return new UserDefinedFunction(name, 
ConfigUtil.normalizeYaml(properties));
        }
 
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 3a4dd81bfb8..d6410161c93 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -97,6 +97,11 @@
         */
        ProgramTargetDescriptor executeUpdate(SessionContext session, String 
statement) throws SqlExecutionException;
 
+       /**
+        * Validates the current session. For example, it checks whether all 
views are still valid.
+        */
+       void validateSession(SessionContext session) throws 
SqlExecutionException;
+
        /**
         * Stops the executor.
         */
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index 6fa640f53a9..d154df3deeb 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -20,7 +20,9 @@
 
 import org.apache.flink.table.client.config.Environment;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -30,13 +32,20 @@
 public class SessionContext {
 
        private final String name;
+
        private final Environment defaultEnvironment;
+
        private final Map<String, String> sessionProperties;
 
+       private final Map<String, String> views;
+
        public SessionContext(String name, Environment defaultEnvironment) {
                this.name = name;
                this.defaultEnvironment = defaultEnvironment;
                this.sessionProperties = new HashMap<>();
+               // the order of how views are registered matters because
+               // they might reference each other
+               this.views = new LinkedHashMap<>();
        }
 
        public void setSessionProperty(String key, String value) {
@@ -47,18 +56,33 @@ public void resetSessionProperties() {
                sessionProperties.clear();
        }
 
+       public void addView(String name, String query) {
+               views.put(name, query);
+       }
+
+       public void removeView(String name) {
+               views.remove(name);
+       }
+
+       public Map<String, String> getViews() {
+               return Collections.unmodifiableMap(views);
+       }
+
        public String getName() {
                return name;
        }
 
        public Environment getEnvironment() {
-               // enrich with session properties
-               return Environment.enrich(defaultEnvironment, 
sessionProperties);
+               return Environment.enrich(
+                       defaultEnvironment,
+                       sessionProperties,
+                       views);
        }
 
        public SessionContext copy() {
                final SessionContext session = new SessionContext(name, 
defaultEnvironment);
                session.sessionProperties.putAll(sessionProperties);
+               session.views.putAll(views);
                return session;
        }
 
@@ -73,11 +97,16 @@ public boolean equals(Object o) {
                SessionContext context = (SessionContext) o;
                return Objects.equals(name, context.name) &&
                        Objects.equals(defaultEnvironment, 
context.defaultEnvironment) &&
-                       Objects.equals(sessionProperties, 
context.sessionProperties);
+                       Objects.equals(sessionProperties, 
context.sessionProperties) &&
+                       Objects.equals(views, context.views);
        }
 
        @Override
        public int hashCode() {
-               return Objects.hash(name, defaultEnvironment, 
sessionProperties);
+               return Objects.hash(
+                       name,
+                       defaultEnvironment,
+                       sessionProperties,
+                       views);
        }
 }
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 9ff683755d2..85b3e9265a8 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -42,6 +42,7 @@
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.client.config.Deployment;
@@ -309,6 +310,18 @@ private EnvironmentInstance() {
                                        }
                                });
                        }
+
+                       // register views
+                       mergedEnv.getViews().forEach((name, query) -> {
+                               // if registering a view fails at this point
+                               // it means that it accesses tables that are 
not available anymore
+                               try {
+                                       tableEnv.registerTable(name, 
tableEnv.sqlQuery(query));
+                               } catch (ValidationException e) {
+                                       throw new SqlExecutionException(
+                                               "Invalid view '" + name + "' 
with query:\n" + query + "\nCause: " + e.getMessage());
+                               }
+                       });
                }
 
                public QueryConfig getQueryConfig() {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index b2e82715bd5..3b9e8e99b82 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -288,6 +288,12 @@ public ProgramTargetDescriptor 
executeUpdate(SessionContext session, String stat
                return executeUpdateInternal(context, statement);
        }
 
+       @Override
+       public void validateSession(SessionContext session) throws 
SqlExecutionException {
+               // throws exceptions if an environment cannot be created with 
the given session context
+               
getOrCreateExecutionContext(session).createEnvironmentInstance();
+       }
+
        @Override
        public void stop(SessionContext session) {
                resultStore.getResults().forEach((resultId) -> {
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 99318bee996..90c91959ac3 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -153,6 +153,11 @@ public ProgramTargetDescriptor 
executeUpdate(SessionContext session, String stat
                        return new ProgramTargetDescriptor("testClusterId", 
"testJobId", "http://testcluster:1234";);
                }
 
+               @Override
+               public void validateSession(SessionContext session) throws 
SqlExecutionException {
+                       // nothing to do
+               }
+
                @Override
                public void stop(SessionContext session) {
                        // nothing to do
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
new file mode 100644
index 00000000000..9ad7fe9d1fb
--- /dev/null
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommand;
+import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link SqlCommandParser}.
+ */
+public class SqlCommandParserTest {
+
+       @Test
+       public void testCommands() {
+               testValidSqlCommand("QUIT;", new 
SqlCommandCall(SqlCommand.QUIT));
+               testValidSqlCommand("eXiT", new 
SqlCommandCall(SqlCommand.QUIT));
+               testValidSqlCommand("CLEAR", new 
SqlCommandCall(SqlCommand.CLEAR));
+               testValidSqlCommand("SHOW TABLES", new 
SqlCommandCall(SqlCommand.SHOW_TABLES));
+               testValidSqlCommand("  SHOW   TABLES   ", new 
SqlCommandCall(SqlCommand.SHOW_TABLES));
+               testValidSqlCommand("SHOW FUNCTIONS", new 
SqlCommandCall(SqlCommand.SHOW_FUNCTIONS));
+               testValidSqlCommand("  SHOW    FUNCTIONS   ", new 
SqlCommandCall(SqlCommand.SHOW_FUNCTIONS));
+               testValidSqlCommand("DESCRIBE MyTable", new 
SqlCommandCall(SqlCommand.DESCRIBE, new String[]{"MyTable"}));
+               testValidSqlCommand("DESCRIBE         MyTable     ", new 
SqlCommandCall(SqlCommand.DESCRIBE, new String[]{"MyTable"}));
+               testInvalidSqlCommand("DESCRIBE  "); // no table name
+               testValidSqlCommand(
+                       "EXPLAIN SELECT complicated FROM table",
+                       new SqlCommandCall(SqlCommand.EXPLAIN, new 
String[]{"SELECT complicated FROM table"}));
+               testInvalidSqlCommand("EXPLAIN  "); // no query
+               testValidSqlCommand(
+                       "SELECT complicated FROM table",
+                       new SqlCommandCall(SqlCommand.SELECT, new 
String[]{"SELECT complicated FROM table"}));
+               testValidSqlCommand(
+                       "   SELECT  complicated FROM table    ",
+                       new SqlCommandCall(SqlCommand.SELECT, new 
String[]{"SELECT  complicated FROM table"}));
+               testValidSqlCommand(
+                       "INSERT INTO other SELECT 1+1",
+                       new SqlCommandCall(SqlCommand.INSERT_INTO, new 
String[]{"INSERT INTO other SELECT 1+1"}));
+               testValidSqlCommand(
+                       "CREATE VIEW x AS SELECT 1+1",
+                       new SqlCommandCall(SqlCommand.CREATE_VIEW, new 
String[]{"x", "SELECT 1+1"}));
+               testValidSqlCommand(
+                       "CREATE   VIEW    MyTable   AS     SELECT 1+1 FROM y",
+                       new SqlCommandCall(SqlCommand.CREATE_VIEW, new 
String[]{"MyTable", "SELECT 1+1 FROM y"}));
+               testInvalidSqlCommand("CREATE VIEW x SELECT 1+1"); // missing AS
+               testValidSqlCommand("DROP VIEW MyTable", new 
SqlCommandCall(SqlCommand.DROP_VIEW, new String[]{"MyTable"}));
+               testValidSqlCommand("DROP VIEW  MyTable", new 
SqlCommandCall(SqlCommand.DROP_VIEW, new String[]{"MyTable"}));
+               testInvalidSqlCommand("DROP VIEW");
+               testValidSqlCommand("SET", new SqlCommandCall(SqlCommand.SET));
+               testValidSqlCommand("SET x=y", new 
SqlCommandCall(SqlCommand.SET, new String[] {"x", "y"}));
+               testValidSqlCommand("SET    x  = y", new 
SqlCommandCall(SqlCommand.SET, new String[] {"x", " y"}));
+               testValidSqlCommand("reset;", new 
SqlCommandCall(SqlCommand.RESET));
+               testValidSqlCommand("source /my/file", new 
SqlCommandCall(SqlCommand.SOURCE, new String[] {"/my/file"}));
+               testInvalidSqlCommand("source"); // missing path
+       }
+
+       private void testInvalidSqlCommand(String stmt) {
+               final Optional<SqlCommandCall> actualCall = 
SqlCommandParser.parse(stmt);
+               if (actualCall.isPresent()) {
+                       fail();
+               }
+       }
+
+       private void testValidSqlCommand(String stmt, SqlCommandCall 
expectedCall) {
+               final Optional<SqlCommandCall> actualCall = 
SqlCommandParser.parse(stmt);
+               if (!actualCall.isPresent()) {
+                       fail();
+               }
+               assertEquals(expectedCall, actualCall.get());
+       }
+}
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 04575c696a2..6752abfd195 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -79,10 +79,11 @@ public void testFunctions() throws Exception {
        }
 
        @Test
-       public void testSourceSinks() throws Exception {
+       public void testTables() throws Exception {
                final ExecutionContext<?> context = createExecutionContext();
                final Map<String, TableSource<?>> sources = 
context.getTableSources();
                final Map<String, TableSink<?>> sinks = context.getTableSinks();
+               final Map<String, String> views = 
context.getMergedEnvironment().getViews();
 
                assertEquals(
                        new HashSet<>(Arrays.asList("TableSourceSink", 
"TableNumber1", "TableNumber2")),
@@ -92,6 +93,10 @@ public void testSourceSinks() throws Exception {
                        new 
HashSet<>(Collections.singletonList("TableSourceSink")),
                        sinks.keySet());
 
+               assertEquals(
+                       new HashSet<>(Arrays.asList("TestView1", "TestView2")),
+                       views.keySet());
+
                assertArrayEquals(
                        new String[]{"IntegerField1", "StringField1"},
                        
sources.get("TableNumber1").getTableSchema().getColumnNames());
@@ -119,7 +124,7 @@ public void testSourceSinks() throws Exception {
                final TableEnvironment tableEnv = 
context.createEnvironmentInstance().getTableEnvironment();
 
                assertArrayEquals(
-                       new String[]{"TableNumber1", "TableNumber2", 
"TableSourceSink"},
+                       new String[]{"TableNumber1", "TableNumber2", 
"TableSourceSink", "TestView1", "TestView2"},
                        tableEnv.listTables());
        }
 
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index ed4ce7a9d8d..76648d08e1a 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -36,6 +36,7 @@
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SessionContext;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.test.util.MiniClusterResource;
@@ -104,6 +105,49 @@ private static Configuration getConfig() {
                return config;
        }
 
+       @Test
+       public void testValidateSession() throws Exception {
+               final Executor executor = createDefaultExecutor(clusterClient);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+
+               executor.validateSession(session);
+
+               session.addView("AdditionalView1", "SELECT 1");
+               session.addView("AdditionalView2", "SELECT * FROM 
AdditionalView1");
+               executor.validateSession(session);
+
+               List<String> actualTables = executor.listTables(session);
+               List<String> expectedTables = Arrays.asList(
+                       "AdditionalView1",
+                       "AdditionalView2",
+                       "TableNumber1",
+                       "TableNumber2",
+                       "TableSourceSink",
+                       "TestView1",
+                       "TestView2");
+               assertEquals(expectedTables, actualTables);
+
+               session.removeView("AdditionalView1");
+               try {
+                       executor.validateSession(session);
+                       fail();
+               } catch (SqlExecutionException e) {
+                       // AdditionalView2 needs AdditionalView1
+               }
+
+               session.removeView("AdditionalView2");
+               executor.validateSession(session);
+
+               actualTables = executor.listTables(session);
+               expectedTables = Arrays.asList(
+                       "TableNumber1",
+                       "TableNumber2",
+                       "TableSourceSink",
+                       "TestView1",
+                       "TestView2");
+               assertEquals(expectedTables, actualTables);
+       }
+
        @Test
        public void testListTables() throws Exception {
                final Executor executor = createDefaultExecutor(clusterClient);
@@ -111,7 +155,12 @@ public void testListTables() throws Exception {
 
                final List<String> actualTables = executor.listTables(session);
 
-               final List<String> expectedTables = 
Arrays.asList("TableNumber1", "TableNumber2", "TableSourceSink");
+               final List<String> expectedTables = Arrays.asList(
+                       "TableNumber1",
+                       "TableNumber2",
+                       "TableSourceSink",
+                       "TestView1",
+                       "TestView2");
                assertEquals(expectedTables, actualTables);
        }
 
@@ -264,19 +313,19 @@ public void testBatchQueryExecution() throws Exception {
                final SessionContext session = new 
SessionContext("test-session", new Environment());
 
                try {
-                       final ResultDescriptor desc = 
executor.executeQuery(session, "SELECT IntegerField1 FROM TableNumber1");
+                       final ResultDescriptor desc = 
executor.executeQuery(session, "SELECT * FROM TestView1");
 
                        assertTrue(desc.isMaterialized());
 
                        final List<String> actualResults = 
retrieveTableResult(executor, session, desc.getResultId());
 
                        final List<String> expectedResults = new ArrayList<>();
-                       expectedResults.add("42");
-                       expectedResults.add("22");
-                       expectedResults.add("32");
-                       expectedResults.add("32");
-                       expectedResults.add("42");
-                       expectedResults.add("52");
+                       expectedResults.add("47");
+                       expectedResults.add("27");
+                       expectedResults.add("37");
+                       expectedResults.add("37");
+                       expectedResults.add("47");
+                       expectedResults.add("57");
 
                        TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
                } finally {
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 22351bc3959..cd5257e611c 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -83,6 +83,12 @@ tables:
         - name: StringField
           type: VARCHAR
 
+views:
+  - name: TestView1
+    query: SELECT scalarUDF(IntegerField1) FROM TableNumber1
+  - name: TestView2
+    query: SELECT * FROM TestView1
+
 functions:
   - name: scalarUDF
     from: class


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> ---------------------------------
>
>                 Key: FLINK-10163
>                 URL: https://issues.apache.org/jira/browse/FLINK-10163
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to