This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 84e1186 [FLINK-22940][sql-client] Make sql client column max width configurable 84e1186 is described below commit 84e1186d24ebbe6b3a4629496274d6337b333af1 Author: Svend Vanderveken <1214071+sv3...@users.noreply.github.com> AuthorDate: Sat Jun 26 13:19:39 2021 +0200 [FLINK-22940][sql-client] Make sql client column max width configurable This closes #16245 --- docs/content/docs/dev/table/sqlClient.md | 205 +++++++++++++-------- .../generated/sql_client_configuration.html | 6 + .../table/client/cli/CliChangelogResultView.java | 42 ++--- .../flink/table/client/cli/CliResultView.java | 12 +- .../flink/table/client/cli/CliTableResultView.java | 27 +-- .../table/client/cli/CliTableauResultView.java | 3 +- .../table/client/config/SqlClientOptions.java | 14 ++ .../table/client/gateway/ResultDescriptor.java | 25 ++- .../table/client/gateway/local/LocalExecutor.java | 10 +- .../apache/flink/table/client/SqlClientTest.java | 2 +- .../flink/table/client/cli/CliClientTest.java | 2 +- .../flink/table/client/cli/CliResultViewTest.java | 10 +- .../table/client/cli/CliTableauResultViewTest.java | 48 ++++- .../client/gateway/context/SessionContextTest.java | 2 +- .../src/test/resources/sql/select.q | 68 +++++++ 15 files changed, 319 insertions(+), 157 deletions(-) diff --git a/docs/content/docs/dev/table/sqlClient.md b/docs/content/docs/dev/table/sqlClient.md index 725ad15..375328d 100644 --- a/docs/content/docs/dev/table/sqlClient.md +++ b/docs/content/docs/dev/table/sqlClient.md @@ -34,7 +34,6 @@ The *SQL Client* aims to provide an easy way of writing, debugging, and submitti {{< img width="80%" src="/fig/sql_client_demo.gif" alt="Animated demo of the Flink SQL Client CLI running table programs on a cluster" >}} - Getting Started --------------- @@ -60,86 +59,26 @@ or explicitly use `embedded` mode: ./bin/sql-client.sh embedded ``` +See [SQL Client startup options](#sql-client-startup-options) below for more details. + ### Running SQL Queries -Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. -For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: +For validating your setup and cluster connection, you can enter the simple query below and press `Enter` to execute it. ```sql -SELECT 'Hello World'; -``` - -This query requires no table source and produces a single row result. The CLI will retrieve results -from the cluster and visualize them. You can close the result view by pressing the `Q` key. - -The CLI supports **three modes** for maintaining and visualizing results. - -The **table mode** materializes results in memory and visualizes them in a regular, paginated table representation. -It can be enabled by executing the following command in the CLI: - -```text -SET 'sql-client.execution.result-mode' = 'table'; -``` - -The **changelog mode** does not materialize results and visualizes the result stream that is produced -by a [continuous query]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}#continuous-queries) consisting of insertions (`+`) and retractions (`-`). - -```text -SET 'sql-client.execution.result-mode' = 'changelog'; -``` - -The **tableau mode** is more like a traditional way which will display the results in the screen directly with a tableau format. -The displaying content will be influenced by the query execution type(`execution.type`). - -```text SET 'sql-client.execution.result-mode' = 'tableau'; +SET 'execution.runtime-mode' = 'batch'; + +SELECT + name, + COUNT(*) AS cnt +FROM + (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) +GROUP BY name; ``` -Note that when you use this mode with streaming query, the result will be continuously printed on the console. If the input data of -this query is bounded, the job will terminate after Flink processed all input data, and the printing will also be stopped automatically. -Otherwise, if you want to terminate a running query, just type `CTRL-C` in this case, the job and the printing will be stopped. - -You can use the following query to see all the result modes in action: - -```sql -SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; -``` - -This query performs a bounded word count example. - -In *changelog mode*, the visualized changelog should be similar to: - -```text -+ Bob, 1 -+ Alice, 1 -+ Greg, 1 -- Bob, 1 -+ Bob, 2 -``` - -In *table mode*, the visualized result table is continuously updated until the table program ends with: - -```text -Bob, 2 -Alice, 1 -Greg, 1 -``` - -In *tableau mode*, if you ran the query in streaming mode, the displayed result would be: -```text -+-----+----------------------+----------------------+ -| +/- | name | cnt | -+-----+----------------------+----------------------+ -| + | Bob | 1 | -| + | Alice | 1 | -| + | Greg | 1 | -| - | Bob | 1 | -| + | Bob | 2 | -+-----+----------------------+----------------------+ -Received a total of 5 rows -``` +The SQL client will retrieve the results from the cluster and visualize them (you can close the result view by pressing the `Q` key): -And if you ran the query in batch mode, the displayed result would be: ```text +-------+-----+ | name | cnt | @@ -148,18 +87,18 @@ And if you ran the query in batch mode, the displayed result would be: | Bob | 2 | | Greg | 1 | +-------+-----+ -3 rows in set ``` -All these result modes can be useful during the prototyping of SQL queries. In all these modes, -results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, -the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through -bigger results that are only limited by the available main memory and the configured -[maximum number of rows](#sql-client-execution-max-table-result-rows) (`sql-client.execution.max-table-result.rows`). +The `SET` command allows you to tune the job execution and the sql client behaviour. See [SQL Client Configuration](#sql-client-configuration) below for more details. -<span class="label label-danger">Attention</span> Queries that are executed in a batch environment, can only be retrieved using the `table` or `tableau` result mode. +After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. +The [configuration section](#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. -After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. The [configuration section](#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. +### Getting help + +The documentation of the SQL client commands can be accessed by typing the `HELP` command. + +See also the general [SQL]({{< ref "docs/dev/table/sql/overview" >}}) documentation. {{< top >}} @@ -297,8 +236,112 @@ Mode "embedded" (default) submits Flink jobs from the local machine. ### SQL Client Configuration +You can configure the SQL client by setting the options below, or any valid [Flink configuration]({{< ref "docs/dev/table/config" >}}) entry: + +```sql +SET 'key' = 'value'; +``` + {{< generated/sql_client_configuration >}} +### SQL client result modes + +The CLI supports **three modes** for maintaining and visualizing results. + +The **table mode** materializes results in memory and visualizes them in a regular, paginated table representation. +It can be enabled by executing the following command in the CLI: + +```text +SET 'sql-client.execution.result-mode' = 'table'; +``` + +The result of a query would then look like this, you can use the keys indicated at the bottom of the screen as well +as the arrows keys to navigate and open the various records: + +```text + + name age isHappy dob height + user1 20 true 1995-12-03 1.7 + user2 30 true 1972-08-02 1.89 + user3 40 false 1983-12-23 1.63 + user4 41 true 1977-11-13 1.72 + user5 22 false 1998-02-20 1.61 + user6 12 true 1969-04-08 1.58 + user7 38 false 1987-12-15 1.6 + user8 62 true 1996-08-05 1.82 + + + + +Q Quit + Inc Refresh G Goto Page N Next Page O Open Row +R Refresh - Dec Refresh L Last Page P Prev Page +``` + +The **changelog mode** does not materialize results and visualizes the result stream that is produced +by a [continuous query]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}#continuous-queries) consisting of insertions (`+`) and retractions (`-`). + +```text +SET 'sql-client.execution.result-mode' = 'changelog'; +``` + +The result of a query would then look like this: + +```text + op name age isHappy dob height + +I user1 20 true 1995-12-03 1.7 + +I user2 30 true 1972-08-02 1.89 + +I user3 40 false 1983-12-23 1.63 + +I user4 41 true 1977-11-13 1.72 + +I user5 22 false 1998-02-20 1.61 + +I user6 12 true 1969-04-08 1.58 + +I user7 38 false 1987-12-15 1.6 + +I user8 62 true 1996-08-05 1.82 + + + + +Q Quit + Inc Refresh O Open Row +R Refresh - Dec Refresh + +``` + +The **tableau mode** is more like a traditional way which will display the results in the screen directly with a tableau format. +The displaying content will be influenced by the query execution type (`execution.type`). + +```text +SET 'sql-client.execution.result-mode' = 'tableau'; +``` + +The result of a query would then look like this: + +```text ++----+--------------------------------+-------------+---------+------------+--------------------------------+ +| op | name | age | isHappy | dob | height | ++----+--------------------------------+-------------+---------+------------+--------------------------------+ +| +I | user1 | 20 | true | 1995-12-03 | 1.7 | +| +I | user2 | 30 | true | 1972-08-02 | 1.89 | +| +I | user3 | 40 | false | 1983-12-23 | 1.63 | +| +I | user4 | 41 | true | 1977-11-13 | 1.72 | +| +I | user5 | 22 | false | 1998-02-20 | 1.61 | +| +I | user6 | 12 | true | 1969-04-08 | 1.58 | +| +I | user7 | 38 | false | 1987-12-15 | 1.6 | +| +I | user8 | 62 | true | 1996-08-05 | 1.82 | ++----+--------------------------------+-------------+---------+------------+--------------------------------+ +Received a total of 8 rows +``` + +Note that when you use this mode with streaming query, the result will be continuously printed on the console. If the input data of +this query is bounded, the job will terminate after Flink processed all input data, and the printing will also be stopped automatically. +Otherwise, if you want to terminate a running query, just type `CTRL-C` in this case, the job and the printing will be stopped. + +All these result modes can be useful during the prototyping of SQL queries. In all these modes, +results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, +the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through +bigger results that are only limited by the available main memory and the configured +[maximum number of rows](#sql-client-execution-max-table-result-rows) (`sql-client.execution.max-table-result.rows`). + +<span class="label label-danger">Attention</span> Queries that are executed in a batch environment, can only be retrieved using the `table` or `tableau` result mode. + ### Initialize Session Using SQL Files A SQL query needs a configuration environment in which it is executed. SQL Client supports the `-i` diff --git a/docs/layouts/shortcodes/generated/sql_client_configuration.html b/docs/layouts/shortcodes/generated/sql_client_configuration.html index faf2a6a..f4a73ce 100644 --- a/docs/layouts/shortcodes/generated/sql_client_configuration.html +++ b/docs/layouts/shortcodes/generated/sql_client_configuration.html @@ -9,6 +9,12 @@ </thead> <tbody> <tr> + <td><h5>sql-client.display.max-column-width</h5><br> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">30</td> + <td>Integer</td> + <td>When printing the query results, this parameter determines the number of characters shown on screen before truncating.This only applies to columns with variable-length types (e.g. STRING) in streaming mode.Fixed-length types and all types in batch mode are printed using a deterministic column width</td> + </tr> + <tr> <td><h5>sql-client.execution.max-table-result.rows</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">1000000</td> <td>Integer</td> diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java index 55aaba5..33f64fd 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.stream.IntStream; import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER; import static org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions; @@ -62,7 +63,14 @@ public class CliChangelogResultView private int scrolling; public CliChangelogResultView(CliClient client, ResultDescriptor resultDescriptor) { - super(client, resultDescriptor); + super( + client, + resultDescriptor, + PrintUtils.columnWidthsByType( + resultDescriptor.getResultSchema().getColumns(), + resultDescriptor.maxColumnWidth(), + PrintUtils.NULL_COLUMN, + PrintUtils.ROW_KIND_COLUMN)); if (client.isPlainTerminal()) { refreshInterval = DEFAULT_REFRESH_INTERVAL_PLAIN; @@ -86,16 +94,6 @@ public class CliChangelogResultView } @Override - protected int computeColumnWidth(int idx) { - // change column has a fixed length - if (idx == 0) { - return 3; - } else { - return PrintUtils.MAX_COLUMN_WIDTH; - } - } - - @Override protected void display() { // scroll down before displaying if (scrolling > 0) { @@ -281,23 +279,19 @@ public class CliChangelogResultView @Override protected List<AttributedString> computeMainHeaderLines() { - final AttributedStringBuilder schemaHeader = new AttributedStringBuilder(); - // add change column - schemaHeader.append(' '); - schemaHeader.style(AttributedStyle.DEFAULT.underline()); - schemaHeader.append("op"); - schemaHeader.style(AttributedStyle.DEFAULT); - - resultDescriptor - .getResultSchema() - .getColumnNames() + List<String> columnNames = new ArrayList<>(columnWidths.length); + columnNames.add("op"); + columnNames.addAll(resultDescriptor.getResultSchema().getColumnNames()); + + final AttributedStringBuilder schemaHeader = new AttributedStringBuilder(); + IntStream.range(0, columnNames.size()) .forEach( - s -> { + idx -> { + schemaHeader.style(AttributedStyle.DEFAULT); schemaHeader.append(' '); schemaHeader.style(AttributedStyle.DEFAULT.underline()); - normalizeColumn(schemaHeader, s, PrintUtils.MAX_COLUMN_WIDTH); - schemaHeader.style(AttributedStyle.DEFAULT); + normalizeColumn(schemaHeader, columnNames.get(idx), columnWidths[idx]); }); return Collections.singletonList(schemaHeader.toAttributedString()); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java index daf32ef..b1fe81f 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java @@ -54,6 +54,7 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void> private final RefreshThread refreshThread; protected final ResultDescriptor resultDescriptor; + protected final int[] columnWidths; protected int refreshInterval; @@ -63,10 +64,10 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void> protected int selectedRow; - public CliResultView(CliClient client, ResultDescriptor resultDescriptor) { + public CliResultView(CliClient client, ResultDescriptor resultDescriptor, int[] columnWidths) { super(client); this.resultDescriptor = resultDescriptor; - + this.columnWidths = columnWidths; refreshThread = new RefreshThread(); selectedRow = NO_ROW_SELECTED; } @@ -171,8 +172,6 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void> protected abstract void refresh(); - protected abstract int computeColumnWidth(int idx); - protected abstract String[] getRow(String[] resultRow); // -------------------------------------------------------------------------------------------- @@ -197,7 +196,6 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void> for (int colIdx = 0; colIdx < line.length; colIdx++) { final String col = line[colIdx]; - final int columnWidth = computeColumnWidth(colIdx); row.append(' '); // check if value was present before last update, if not, highlight it @@ -209,10 +207,10 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void> && (lineIdx >= previousResults.size() || !col.equals(previousResults.get(lineIdx)[colIdx]))) { row.style(AttributedStyle.BOLD); - normalizeColumn(row, col, columnWidth); + normalizeColumn(row, col, columnWidths[colIdx]); row.style(AttributedStyle.DEFAULT); } else { - normalizeColumn(row, col, columnWidth); + normalizeColumn(row, col, columnWidths[colIdx]); } } lines.add(row.toAttributedString()); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java index b5c40fb..1fa3e94 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER; import static org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions; @@ -61,7 +62,14 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT private static final int LAST_PAGE = 0; public CliTableResultView(CliClient client, ResultDescriptor resultDescriptor) { - super(client, resultDescriptor); + super( + client, + resultDescriptor, + PrintUtils.columnWidthsByType( + resultDescriptor.getResultSchema().getColumns(), + resultDescriptor.maxColumnWidth(), + PrintUtils.NULL_COLUMN, + null)); refreshInterval = DEFAULT_REFRESH_INTERVAL; pageCount = 1; @@ -83,11 +91,6 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT } @Override - protected int computeColumnWidth(int idx) { - return PrintUtils.MAX_COLUMN_WIDTH; - } - - @Override protected void refresh() { // take snapshot TypedResult<Integer> result; @@ -269,15 +272,15 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT protected List<AttributedString> computeMainHeaderLines() { final AttributedStringBuilder schemaHeader = new AttributedStringBuilder(); - resultDescriptor - .getResultSchema() - .getColumnNames() + IntStream.range(0, resultDescriptor.getResultSchema().getColumnCount()) .forEach( - s -> { + idx -> { + schemaHeader.style(AttributedStyle.DEFAULT); schemaHeader.append(' '); + String columnName = + resultDescriptor.getResultSchema().getColumnNames().get(idx); schemaHeader.style(AttributedStyle.DEFAULT.underline()); - normalizeColumn(schemaHeader, s, PrintUtils.MAX_COLUMN_WIDTH); - schemaHeader.style(AttributedStyle.DEFAULT); + normalizeColumn(schemaHeader, columnName, columnWidths[idx]); }); return Collections.singletonList(schemaHeader.toAttributedString()); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java index 76af661..99838a5 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java @@ -140,10 +140,11 @@ public class CliTableauResultView implements AutoCloseable { Stream.of(PrintUtils.ROW_KIND_COLUMN), columns.stream().map(Column::getName)) .toArray(String[]::new); + final int[] colWidths = PrintUtils.columnWidthsByType( columns, - PrintUtils.MAX_COLUMN_WIDTH, + resultDescriptor.maxColumnWidth(), PrintUtils.NULL_COLUMN, PrintUtils.ROW_KIND_COLUMN); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java index bafe2d3..25f1ca8 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java @@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigOptions; public class SqlClientOptions { private SqlClientOptions() {} + // Execution options + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption<Integer> EXECUTION_MAX_TABLE_RESULT_ROWS = ConfigOptions.key("sql-client.execution.max-table-result.rows") @@ -53,4 +55,16 @@ public class SqlClientOptions { .defaultValue(false) .withDescription( "Determine whether to output the verbose output to the console. If set the option true, it will print the exception stack. Otherwise, it only output the cause."); + + // Display options + + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<Integer> DISPLAY_MAX_COLUMN_WIDTH = + ConfigOptions.key("sql-client.display.max-column-width") + .intType() + .defaultValue(30) + .withDescription( + "When printing the query results, this parameter determines the number of characters shown on screen before truncating." + + "This only applies to columns with variable-length types (e.g. STRING) in streaming mode." + + "Fixed-length types and all types in batch mode are printed using a deterministic column width"); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java index b6e3b04..c63b5af 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java @@ -18,7 +18,14 @@ package org.apache.flink.table.client.gateway; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.client.config.ResultMode; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.table.client.config.SqlClientOptions.DISPLAY_MAX_COLUMN_WIDTH; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; /** Describes a result to be expected from a table program. */ public class ResultDescriptor { @@ -29,21 +36,17 @@ public class ResultDescriptor { private final boolean isMaterialized; - private final boolean isTableauMode; - - private final boolean isStreamingMode; + private final ReadableConfig config; public ResultDescriptor( String resultId, ResolvedSchema resultSchema, boolean isMaterialized, - boolean isTableauMode, - boolean isStreamingMode) { + ReadableConfig config) { this.resultId = resultId; this.resultSchema = resultSchema; this.isMaterialized = isMaterialized; - this.isTableauMode = isTableauMode; - this.isStreamingMode = isStreamingMode; + this.config = config; } public String getResultId() { @@ -59,10 +62,14 @@ public class ResultDescriptor { } public boolean isTableauMode() { - return isTableauMode; + return config.get(EXECUTION_RESULT_MODE).equals(ResultMode.TABLEAU); } public boolean isStreamingMode() { - return isStreamingMode; + return config.get(RUNTIME_MODE).equals(RuntimeExecutionMode.STREAMING); + } + + public int maxColumnWidth() { + return config.get(DISPLAY_MAX_COLUMN_WIDTH); } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index f8a1026..2306488 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -19,12 +19,10 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.client.config.ResultMode; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.ResultDescriptor; import org.apache.flink.table.client.gateway.SqlExecutionException; @@ -52,9 +50,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR; -import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -241,11 +237,7 @@ public class LocalExecutor implements Executor { // store the result under the JobID resultStore.storeResult(jobId, result); return new ResultDescriptor( - jobId, - tableResult.getResolvedSchema(), - result.isMaterialized(), - config.get(EXECUTION_RESULT_MODE).equals(ResultMode.TABLEAU), - config.get(RUNTIME_MODE).equals(RuntimeExecutionMode.STREAMING)); + jobId, tableResult.getResolvedSchema(), result.isMaterialized(), config); } @Override diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java index eee91c8..02f78bd 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -51,7 +51,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; /** Tests for {@link SqlClient}. */ public class SqlClientTest { diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index d04d153..cfa90e8 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -75,9 +75,9 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN import static org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY; import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** Tests for the {@link CliClient}. */ diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java index 45e1b67..eaa520c 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java @@ -17,6 +17,7 @@ package org.apache.flink.table.client.cli; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.DataTypes; @@ -24,6 +25,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.client.config.ResultMode; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.ResultDescriptor; import org.apache.flink.table.client.gateway.SqlExecutionException; @@ -48,6 +50,8 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; import static org.junit.Assert.assertTrue; /** Contains basic tests for the {@link CliResultView}. */ @@ -90,14 +94,16 @@ public class CliResultViewTest { new CountDownLatch(expectedCancellationCount); final MockExecutor executor = new MockExecutor(typedResult, cancellationCounterLatch); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLE); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING); String sessionId = executor.openSession("test-session"); final ResultDescriptor descriptor = new ResultDescriptor( "result-id", ResolvedSchema.of(Column.physical("Null Field", DataTypes.STRING())), false, - false, - true); + testConfig); try (CliClient cli = new TestingCliClient( diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java index 83d5302..9c69e48 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java @@ -18,11 +18,14 @@ package org.apache.flink.table.client.cli; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.client.config.ResultMode; import org.apache.flink.table.client.gateway.ResultDescriptor; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; @@ -45,8 +48,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; /** Tests for CliTableauResultView. */ public class CliTableauResultViewTest { @@ -156,7 +161,10 @@ public class CliTableauResultViewTest { @Test public void testBatchResult() { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, false); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); TestingExecutor mockExecutor = new TestingExecutorBuilder() @@ -206,7 +214,10 @@ public class CliTableauResultViewTest { @Test public void testCancelBatchResult() throws Exception { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, false); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); TestingExecutor mockExecutor = new TestingExecutorBuilder() @@ -246,7 +257,11 @@ public class CliTableauResultViewTest { @Test public void testEmptyBatchResult() { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, false); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); + TestingExecutor mockExecutor = new TestingExecutorBuilder() .setResultChangesSupplier(TypedResult::endOfStream) @@ -268,7 +283,10 @@ public class CliTableauResultViewTest { @Test public void testFailedBatchResult() { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, false); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); TestingExecutor mockExecutor = new TestingExecutorBuilder() @@ -295,7 +313,10 @@ public class CliTableauResultViewTest { @Test public void testStreamingResult() { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, true); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); TestingExecutor mockExecutor = new TestingExecutorBuilder() @@ -354,7 +375,10 @@ public class CliTableauResultViewTest { @Test public void testEmptyStreamingResult() { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, true); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); TestingExecutor mockExecutor = new TestingExecutorBuilder() @@ -382,7 +406,10 @@ public class CliTableauResultViewTest { @Test public void testCancelStreamingResult() throws Exception { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, true); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); TestingExecutor mockExecutor = new TestingExecutorBuilder() @@ -435,7 +462,10 @@ public class CliTableauResultViewTest { @Test public void testFailedStreamingResult() { - ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, true, true); + final Configuration testConfig = new Configuration(); + testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU); + testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING); + ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, true, testConfig); TestingExecutor mockExecutor = new TestingExecutorBuilder() diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java index 2567986..11507b5 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java @@ -45,10 +45,10 @@ import static org.apache.flink.configuration.PipelineOptions.NAME; import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; diff --git a/flink-table/flink-sql-client/src/test/resources/sql/select.q b/flink-table/flink-sql-client/src/test/resources/sql/select.q index 9d875c3..55b722c 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/select.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/select.q @@ -115,6 +115,74 @@ FROM (VALUES Received a total of 2 rows !ok +# ========================================================================== +# Testing behavior of sql-client.display.max-column-width +# Only variable width columns are impacted at the moment => STRING, but not TIMESTAMP nor BOOLEAN +# ========================================================================== + +CREATE TEMPORARY VIEW + testUserData(name, dob, isHappy) +AS (VALUES + ('30b5c1bb-0ac0-43d3-b812-fcb649fd2b07', TIMESTAMP '2001-01-13 20:11:11.123', true), + ('91170c98-2cc5-4935-9ea6-12b72d32fb3c', TIMESTAMP '1994-02-14 21:12:11.123', true), + ('8b012d93-6ece-48ad-a2ea-aa75ef7b1d60', TIMESTAMP '1979-03-15 22:13:11.123', false), + ('09969d9e-d584-11eb-b8bc-0242ac130003', TIMESTAMP '1985-04-16 23:14:11.123', true) +); +[INFO] Execute statement succeed. +!info + +SELECT * from testUserData; ++----+--------------------------------+-------------------------+---------+ +| op | name | dob | isHappy | ++----+--------------------------------+-------------------------+---------+ +| +I | 30b5c1bb-0ac0-43d3-b812-fcb... | 2001-01-13 20:11:11.123 | true | +| +I | 91170c98-2cc5-4935-9ea6-12b... | 1994-02-14 21:12:11.123 | true | +| +I | 8b012d93-6ece-48ad-a2ea-aa7... | 1979-03-15 22:13:11.123 | false | +| +I | 09969d9e-d584-11eb-b8bc-024... | 1985-04-16 23:14:11.123 | true | ++----+--------------------------------+-------------------------+---------+ +Received a total of 4 rows +!ok + +SET 'sql-client.display.max-column-width' = '10'; +[INFO] Session property has been set. +!info + +SELECT * from testUserData; ++----+------------+-------------------------+---------+ +| op | name | dob | isHappy | ++----+------------+-------------------------+---------+ +| +I | 30b5c1b... | 2001-01-13 20:11:11.123 | true | +| +I | 91170c9... | 1994-02-14 21:12:11.123 | true | +| +I | 8b012d9... | 1979-03-15 22:13:11.123 | false | +| +I | 09969d9... | 1985-04-16 23:14:11.123 | true | ++----+------------+-------------------------+---------+ +Received a total of 4 rows +!ok + +SET 'sql-client.display.max-column-width' = '40'; +[INFO] Session property has been set. +!info + +SELECT * from testUserData; ++----+------------------------------------------+-------------------------+---------+ +| op | name | dob | isHappy | ++----+------------------------------------------+-------------------------+---------+ +| +I | 30b5c1bb-0ac0-43d3-b812-fcb649fd2b07 | 2001-01-13 20:11:11.123 | true | +| +I | 91170c98-2cc5-4935-9ea6-12b72d32fb3c | 1994-02-14 21:12:11.123 | true | +| +I | 8b012d93-6ece-48ad-a2ea-aa75ef7b1d60 | 1979-03-15 22:13:11.123 | false | +| +I | 09969d9e-d584-11eb-b8bc-0242ac130003 | 1985-04-16 23:14:11.123 | true | ++----+------------------------------------------+-------------------------+---------+ +Received a total of 4 rows +!ok + +-- post-test cleanup + setting back default max width value +DROP TEMPORARY VIEW testUserData; +[INFO] Execute statement succeed. +!info + +SET 'sql-client.display.max-column-width' = '30'; +[INFO] Session property has been set. +!info # ========================================================================== # test batch query