This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 84e1186  [FLINK-22940][sql-client] Make sql client column max width 
configurable
84e1186 is described below

commit 84e1186d24ebbe6b3a4629496274d6337b333af1
Author: Svend Vanderveken <1214071+sv3...@users.noreply.github.com>
AuthorDate: Sat Jun 26 13:19:39 2021 +0200

    [FLINK-22940][sql-client] Make sql client column max width configurable
    
    This closes #16245
---
 docs/content/docs/dev/table/sqlClient.md           | 205 +++++++++++++--------
 .../generated/sql_client_configuration.html        |   6 +
 .../table/client/cli/CliChangelogResultView.java   |  42 ++---
 .../flink/table/client/cli/CliResultView.java      |  12 +-
 .../flink/table/client/cli/CliTableResultView.java |  27 +--
 .../table/client/cli/CliTableauResultView.java     |   3 +-
 .../table/client/config/SqlClientOptions.java      |  14 ++
 .../table/client/gateway/ResultDescriptor.java     |  25 ++-
 .../table/client/gateway/local/LocalExecutor.java  |  10 +-
 .../apache/flink/table/client/SqlClientTest.java   |   2 +-
 .../flink/table/client/cli/CliClientTest.java      |   2 +-
 .../flink/table/client/cli/CliResultViewTest.java  |  10 +-
 .../table/client/cli/CliTableauResultViewTest.java |  48 ++++-
 .../client/gateway/context/SessionContextTest.java |   2 +-
 .../src/test/resources/sql/select.q                |  68 +++++++
 15 files changed, 319 insertions(+), 157 deletions(-)

diff --git a/docs/content/docs/dev/table/sqlClient.md 
b/docs/content/docs/dev/table/sqlClient.md
index 725ad15..375328d 100644
--- a/docs/content/docs/dev/table/sqlClient.md
+++ b/docs/content/docs/dev/table/sqlClient.md
@@ -34,7 +34,6 @@ The *SQL Client* aims to provide an easy way of writing, 
debugging, and submitti
 {{< img width="80%" src="/fig/sql_client_demo.gif" alt="Animated demo of the 
Flink SQL Client CLI running table programs on a cluster" >}}
 
 
-
 Getting Started
 ---------------
 
@@ -60,86 +59,26 @@ or explicitly use `embedded` mode:
 ./bin/sql-client.sh embedded
 ```
 
+See [SQL Client startup options](#sql-client-startup-options) below for more 
details.
+
 ### Running SQL Queries
 
-Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements.
-For validating your setup and cluster connection, you can enter your first SQL 
query and press the `Enter` key to execute it:
+For validating your setup and cluster connection, you can enter the simple 
query below and press `Enter` to execute it.
 
 ```sql
-SELECT 'Hello World';
-```
-
-This query requires no table source and produces a single row result. The CLI 
will retrieve results
-from the cluster and visualize them. You can close the result view by pressing 
the `Q` key.
-
-The CLI supports **three modes** for maintaining and visualizing results.
-
-The **table mode** materializes results in memory and visualizes them in a 
regular, paginated table representation.
-It can be enabled by executing the following command in the CLI:
-
-```text
-SET 'sql-client.execution.result-mode' = 'table';
-```
-
-The **changelog mode** does not materialize results and visualizes the result 
stream that is produced
-by a [continuous query]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#continuous-queries) consisting of insertions (`+`) and retractions (`-`).
-
-```text
-SET 'sql-client.execution.result-mode' = 'changelog';
-```
-
-The **tableau mode** is more like a traditional way which will display the 
results in the screen directly with a tableau format.
-The displaying content will be influenced by the query execution 
type(`execution.type`).
-
-```text
 SET 'sql-client.execution.result-mode' = 'tableau';
+SET 'execution.runtime-mode' = 'batch';
+
+SELECT 
+  name, 
+  COUNT(*) AS cnt 
+FROM 
+  (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) 
+GROUP BY name;
 ```
 
-Note that when you use this mode with streaming query, the result will be 
continuously printed on the console. If the input data of
-this query is bounded, the job will terminate after Flink processed all input 
data, and the printing will also be stopped automatically.
-Otherwise, if you want to terminate a running query, just type `CTRL-C` in 
this case, the job and the printing will be stopped.
-
-You can use the following query to see all the result modes in action:
-
-```sql
-SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name;
-```
-
-This query performs a bounded word count example.
-
-In *changelog mode*, the visualized changelog should be similar to:
-
-```text
-+ Bob, 1
-+ Alice, 1
-+ Greg, 1
-- Bob, 1
-+ Bob, 2
-```
-
-In *table mode*, the visualized result table is continuously updated until the 
table program ends with:
-
-```text
-Bob, 2
-Alice, 1
-Greg, 1
-```
-
-In *tableau mode*, if you ran the query in streaming mode, the displayed 
result would be:
-```text
-+-----+----------------------+----------------------+
-| +/- |                 name |                  cnt |
-+-----+----------------------+----------------------+
-|   + |                  Bob |                    1 |
-|   + |                Alice |                    1 |
-|   + |                 Greg |                    1 |
-|   - |                  Bob |                    1 |
-|   + |                  Bob |                    2 |
-+-----+----------------------+----------------------+
-Received a total of 5 rows
-```
+The SQL client will retrieve the results from the cluster and visualize them 
(you can close the result view by pressing the `Q` key):
 
-And if you ran the query in batch mode, the displayed result would be:
 ```text
 +-------+-----+
 |  name | cnt |
@@ -148,18 +87,18 @@ And if you ran the query in batch mode, the displayed 
result would be:
 |   Bob |   2 |
 |  Greg |   1 |
 +-------+-----+
-3 rows in set
 ```
 
-All these result modes can be useful during the prototyping of SQL queries. In 
all these modes,
-results are stored in the Java heap memory of the SQL Client. In order to keep 
the CLI interface responsive,
-the changelog mode only shows the latest 1000 changes. The table mode allows 
for navigating through
-bigger results that are only limited by the available main memory and the 
configured
-[maximum number of rows](#sql-client-execution-max-table-result-rows) 
(`sql-client.execution.max-table-result.rows`).
+The `SET` command allows you to tune the job execution and the sql client 
behaviour. See [SQL Client Configuration](#sql-client-configuration) below for 
more details.
 
-<span class="label label-danger">Attention</span> Queries that are executed in 
a batch environment, can only be retrieved using the `table` or `tableau` 
result mode.
+After a query is defined, it can be submitted to the cluster as a 
long-running, detached Flink job. 
+The [configuration section](#configuration) explains how to declare table 
sources for reading data, how to declare table sinks for writing data, and how 
to configure other table program properties.
 
-After a query is defined, it can be submitted to the cluster as a 
long-running, detached Flink job. The [configuration section](#configuration) 
explains how to declare table sources for reading data, how to declare table 
sinks for writing data, and how to configure other table program properties.
+### Getting help
+
+The documentation of the SQL client commands can be accessed by typing the 
`HELP` command.
+
+See also the general [SQL]({{< ref "docs/dev/table/sql/overview" >}}) 
documentation.
 
 {{< top >}}
 
@@ -297,8 +236,112 @@ Mode "embedded" (default) submits Flink jobs from the 
local machine.
 
 ### SQL Client Configuration
 
+You can configure the SQL client by setting the options below, or any valid 
[Flink configuration]({{< ref "docs/dev/table/config" >}}) entry:
+
+```sql
+SET 'key' = 'value';
+```
+
 {{< generated/sql_client_configuration >}}
 
+### SQL client result modes
+
+The CLI supports **three modes** for maintaining and visualizing results.
+
+The **table mode** materializes results in memory and visualizes them in a 
regular, paginated table representation.
+It can be enabled by executing the following command in the CLI:
+
+```text
+SET 'sql-client.execution.result-mode' = 'table';
+```
+
+The result of a query would then look like this, you can use the keys 
indicated at the bottom of the screen as well
+as the arrows keys to navigate and open the various records:
+
+```text
+
+                           name         age isHappy        dob                 
        height
+                          user1          20    true 1995-12-03                 
           1.7
+                          user2          30    true 1972-08-02                 
          1.89
+                          user3          40   false 1983-12-23                 
          1.63
+                          user4          41    true 1977-11-13                 
          1.72
+                          user5          22   false 1998-02-20                 
          1.61
+                          user6          12    true 1969-04-08                 
          1.58
+                          user7          38   false 1987-12-15                 
           1.6
+                          user8          62    true 1996-08-05                 
          1.82
+
+
+
+
+Q Quit                     + Inc Refresh              G Goto Page              
  N Next Page                O Open Row
+R Refresh                  - Dec Refresh              L Last Page              
  P Prev Page
+```
+
+The **changelog mode** does not materialize results and visualizes the result 
stream that is produced
+by a [continuous query]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#continuous-queries) consisting of insertions (`+`) and retractions (`-`).
+
+```text
+SET 'sql-client.execution.result-mode' = 'changelog';
+```
+
+The result of a query would then look like this: 
+
+```text
+ op                           name         age isHappy        dob              
           height
+ +I                          user1          20    true 1995-12-03              
              1.7
+ +I                          user2          30    true 1972-08-02              
             1.89
+ +I                          user3          40   false 1983-12-23              
             1.63
+ +I                          user4          41    true 1977-11-13              
             1.72
+ +I                          user5          22   false 1998-02-20              
             1.61
+ +I                          user6          12    true 1969-04-08              
             1.58
+ +I                          user7          38   false 1987-12-15              
              1.6
+ +I                          user8          62    true 1996-08-05              
             1.82
+
+
+
+
+Q Quit                                       + Inc Refresh                     
           O Open Row
+R Refresh                                    - Dec Refresh
+
+```
+
+The **tableau mode** is more like a traditional way which will display the 
results in the screen directly with a tableau format.
+The displaying content will be influenced by the query execution type 
(`execution.type`).
+
+```text
+SET 'sql-client.execution.result-mode' = 'tableau';
+```
+
+The result of a query would then look like this: 
+
+```text
++----+--------------------------------+-------------+---------+------------+--------------------------------+
+| op |                           name |         age | isHappy |        dob |   
                      height |
++----+--------------------------------+-------------+---------+------------+--------------------------------+
+| +I |                          user1 |          20 |    true | 1995-12-03 |   
                         1.7 |
+| +I |                          user2 |          30 |    true | 1972-08-02 |   
                        1.89 |
+| +I |                          user3 |          40 |   false | 1983-12-23 |   
                        1.63 |
+| +I |                          user4 |          41 |    true | 1977-11-13 |   
                        1.72 |
+| +I |                          user5 |          22 |   false | 1998-02-20 |   
                        1.61 |
+| +I |                          user6 |          12 |    true | 1969-04-08 |   
                        1.58 |
+| +I |                          user7 |          38 |   false | 1987-12-15 |   
                         1.6 |
+| +I |                          user8 |          62 |    true | 1996-08-05 |   
                        1.82 |
++----+--------------------------------+-------------+---------+------------+--------------------------------+
+Received a total of 8 rows
+```
+
+Note that when you use this mode with streaming query, the result will be 
continuously printed on the console. If the input data of
+this query is bounded, the job will terminate after Flink processed all input 
data, and the printing will also be stopped automatically.
+Otherwise, if you want to terminate a running query, just type `CTRL-C` in 
this case, the job and the printing will be stopped.
+
+All these result modes can be useful during the prototyping of SQL queries. In 
all these modes,
+results are stored in the Java heap memory of the SQL Client. In order to keep 
the CLI interface responsive,
+the changelog mode only shows the latest 1000 changes. The table mode allows 
for navigating through
+bigger results that are only limited by the available main memory and the 
configured
+[maximum number of rows](#sql-client-execution-max-table-result-rows) 
(`sql-client.execution.max-table-result.rows`).
+
+<span class="label label-danger">Attention</span> Queries that are executed in 
a batch environment, can only be retrieved using the `table` or `tableau` 
result mode.
+
 ### Initialize Session Using SQL Files
 
 A SQL query needs a configuration environment in which it is executed. SQL 
Client supports the `-i`
diff --git a/docs/layouts/shortcodes/generated/sql_client_configuration.html 
b/docs/layouts/shortcodes/generated/sql_client_configuration.html
index faf2a6a..f4a73ce 100644
--- a/docs/layouts/shortcodes/generated/sql_client_configuration.html
+++ b/docs/layouts/shortcodes/generated/sql_client_configuration.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>sql-client.display.max-column-width</h5><br> <span 
class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">30</td>
+            <td>Integer</td>
+            <td>When printing the query results, this parameter determines the 
number of characters shown on screen before truncating.This only applies to 
columns with variable-length types (e.g. STRING) in streaming mode.Fixed-length 
types and all types in batch mode are printed using a deterministic column 
width</td>
+        </tr>
+        <tr>
             <td><h5>sql-client.execution.max-table-result.rows</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">1000000</td>
             <td>Integer</td>
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
index 55aaba5..33f64fd 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
 import static 
org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions;
@@ -62,7 +63,14 @@ public class CliChangelogResultView
     private int scrolling;
 
     public CliChangelogResultView(CliClient client, ResultDescriptor 
resultDescriptor) {
-        super(client, resultDescriptor);
+        super(
+                client,
+                resultDescriptor,
+                PrintUtils.columnWidthsByType(
+                        resultDescriptor.getResultSchema().getColumns(),
+                        resultDescriptor.maxColumnWidth(),
+                        PrintUtils.NULL_COLUMN,
+                        PrintUtils.ROW_KIND_COLUMN));
 
         if (client.isPlainTerminal()) {
             refreshInterval = DEFAULT_REFRESH_INTERVAL_PLAIN;
@@ -86,16 +94,6 @@ public class CliChangelogResultView
     }
 
     @Override
-    protected int computeColumnWidth(int idx) {
-        // change column has a fixed length
-        if (idx == 0) {
-            return 3;
-        } else {
-            return PrintUtils.MAX_COLUMN_WIDTH;
-        }
-    }
-
-    @Override
     protected void display() {
         // scroll down before displaying
         if (scrolling > 0) {
@@ -281,23 +279,19 @@ public class CliChangelogResultView
 
     @Override
     protected List<AttributedString> computeMainHeaderLines() {
-        final AttributedStringBuilder schemaHeader = new 
AttributedStringBuilder();
-
         // add change column
-        schemaHeader.append(' ');
-        schemaHeader.style(AttributedStyle.DEFAULT.underline());
-        schemaHeader.append("op");
-        schemaHeader.style(AttributedStyle.DEFAULT);
-
-        resultDescriptor
-                .getResultSchema()
-                .getColumnNames()
+        List<String> columnNames = new ArrayList<>(columnWidths.length);
+        columnNames.add("op");
+        
columnNames.addAll(resultDescriptor.getResultSchema().getColumnNames());
+
+        final AttributedStringBuilder schemaHeader = new 
AttributedStringBuilder();
+        IntStream.range(0, columnNames.size())
                 .forEach(
-                        s -> {
+                        idx -> {
+                            schemaHeader.style(AttributedStyle.DEFAULT);
                             schemaHeader.append(' ');
                             
schemaHeader.style(AttributedStyle.DEFAULT.underline());
-                            normalizeColumn(schemaHeader, s, 
PrintUtils.MAX_COLUMN_WIDTH);
-                            schemaHeader.style(AttributedStyle.DEFAULT);
+                            normalizeColumn(schemaHeader, 
columnNames.get(idx), columnWidths[idx]);
                         });
 
         return Collections.singletonList(schemaHeader.toAttributedString());
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index daf32ef..b1fe81f 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -54,6 +54,7 @@ public abstract class CliResultView<O extends Enum<O>> 
extends CliView<O, Void>
     private final RefreshThread refreshThread;
 
     protected final ResultDescriptor resultDescriptor;
+    protected final int[] columnWidths;
 
     protected int refreshInterval;
 
@@ -63,10 +64,10 @@ public abstract class CliResultView<O extends Enum<O>> 
extends CliView<O, Void>
 
     protected int selectedRow;
 
-    public CliResultView(CliClient client, ResultDescriptor resultDescriptor) {
+    public CliResultView(CliClient client, ResultDescriptor resultDescriptor, 
int[] columnWidths) {
         super(client);
         this.resultDescriptor = resultDescriptor;
-
+        this.columnWidths = columnWidths;
         refreshThread = new RefreshThread();
         selectedRow = NO_ROW_SELECTED;
     }
@@ -171,8 +172,6 @@ public abstract class CliResultView<O extends Enum<O>> 
extends CliView<O, Void>
 
     protected abstract void refresh();
 
-    protected abstract int computeColumnWidth(int idx);
-
     protected abstract String[] getRow(String[] resultRow);
 
     // 
--------------------------------------------------------------------------------------------
@@ -197,7 +196,6 @@ public abstract class CliResultView<O extends Enum<O>> 
extends CliView<O, Void>
 
             for (int colIdx = 0; colIdx < line.length; colIdx++) {
                 final String col = line[colIdx];
-                final int columnWidth = computeColumnWidth(colIdx);
 
                 row.append(' ');
                 // check if value was present before last update, if not, 
highlight it
@@ -209,10 +207,10 @@ public abstract class CliResultView<O extends Enum<O>> 
extends CliView<O, Void>
                         && (lineIdx >= previousResults.size()
                                 || 
!col.equals(previousResults.get(lineIdx)[colIdx]))) {
                     row.style(AttributedStyle.BOLD);
-                    normalizeColumn(row, col, columnWidth);
+                    normalizeColumn(row, col, columnWidths[colIdx]);
                     row.style(AttributedStyle.DEFAULT);
                 } else {
-                    normalizeColumn(row, col, columnWidth);
+                    normalizeColumn(row, col, columnWidths[colIdx]);
                 }
             }
             lines.add(row.toAttributedString());
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
index b5c40fb..1fa3e94 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
 import static 
org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions;
@@ -61,7 +62,14 @@ public class CliTableResultView extends 
CliResultView<CliTableResultView.ResultT
     private static final int LAST_PAGE = 0;
 
     public CliTableResultView(CliClient client, ResultDescriptor 
resultDescriptor) {
-        super(client, resultDescriptor);
+        super(
+                client,
+                resultDescriptor,
+                PrintUtils.columnWidthsByType(
+                        resultDescriptor.getResultSchema().getColumns(),
+                        resultDescriptor.maxColumnWidth(),
+                        PrintUtils.NULL_COLUMN,
+                        null));
 
         refreshInterval = DEFAULT_REFRESH_INTERVAL;
         pageCount = 1;
@@ -83,11 +91,6 @@ public class CliTableResultView extends 
CliResultView<CliTableResultView.ResultT
     }
 
     @Override
-    protected int computeColumnWidth(int idx) {
-        return PrintUtils.MAX_COLUMN_WIDTH;
-    }
-
-    @Override
     protected void refresh() {
         // take snapshot
         TypedResult<Integer> result;
@@ -269,15 +272,15 @@ public class CliTableResultView extends 
CliResultView<CliTableResultView.ResultT
     protected List<AttributedString> computeMainHeaderLines() {
         final AttributedStringBuilder schemaHeader = new 
AttributedStringBuilder();
 
-        resultDescriptor
-                .getResultSchema()
-                .getColumnNames()
+        IntStream.range(0, resultDescriptor.getResultSchema().getColumnCount())
                 .forEach(
-                        s -> {
+                        idx -> {
+                            schemaHeader.style(AttributedStyle.DEFAULT);
                             schemaHeader.append(' ');
+                            String columnName =
+                                    
resultDescriptor.getResultSchema().getColumnNames().get(idx);
                             
schemaHeader.style(AttributedStyle.DEFAULT.underline());
-                            normalizeColumn(schemaHeader, s, 
PrintUtils.MAX_COLUMN_WIDTH);
-                            schemaHeader.style(AttributedStyle.DEFAULT);
+                            normalizeColumn(schemaHeader, columnName, 
columnWidths[idx]);
                         });
 
         return Collections.singletonList(schemaHeader.toAttributedString());
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
index 76af661..99838a5 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
@@ -140,10 +140,11 @@ public class CliTableauResultView implements 
AutoCloseable {
                                 Stream.of(PrintUtils.ROW_KIND_COLUMN),
                                 columns.stream().map(Column::getName))
                         .toArray(String[]::new);
+
         final int[] colWidths =
                 PrintUtils.columnWidthsByType(
                         columns,
-                        PrintUtils.MAX_COLUMN_WIDTH,
+                        resultDescriptor.maxColumnWidth(),
                         PrintUtils.NULL_COLUMN,
                         PrintUtils.ROW_KIND_COLUMN);
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java
index bafe2d3..25f1ca8 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigOptions;
 public class SqlClientOptions {
     private SqlClientOptions() {}
 
+    // Execution options
+
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
     public static final ConfigOption<Integer> EXECUTION_MAX_TABLE_RESULT_ROWS =
             ConfigOptions.key("sql-client.execution.max-table-result.rows")
@@ -53,4 +55,16 @@ public class SqlClientOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Determine whether to output the verbose output to 
the console. If set the option true, it will print the exception stack. 
Otherwise, it only output the cause.");
+
+    // Display options
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Integer> DISPLAY_MAX_COLUMN_WIDTH =
+            ConfigOptions.key("sql-client.display.max-column-width")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription(
+                            "When printing the query results, this parameter 
determines the number of characters shown on screen before truncating."
+                                    + "This only applies to columns with 
variable-length types (e.g. STRING) in streaming mode."
+                                    + "Fixed-length types and all types in 
batch mode are printed using a deterministic column width");
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
index b6e3b04..c63b5af 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
@@ -18,7 +18,14 @@
 
 package org.apache.flink.table.client.gateway;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.config.ResultMode;
+
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.DISPLAY_MAX_COLUMN_WIDTH;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
 
 /** Describes a result to be expected from a table program. */
 public class ResultDescriptor {
@@ -29,21 +36,17 @@ public class ResultDescriptor {
 
     private final boolean isMaterialized;
 
-    private final boolean isTableauMode;
-
-    private final boolean isStreamingMode;
+    private final ReadableConfig config;
 
     public ResultDescriptor(
             String resultId,
             ResolvedSchema resultSchema,
             boolean isMaterialized,
-            boolean isTableauMode,
-            boolean isStreamingMode) {
+            ReadableConfig config) {
         this.resultId = resultId;
         this.resultSchema = resultSchema;
         this.isMaterialized = isMaterialized;
-        this.isTableauMode = isTableauMode;
-        this.isStreamingMode = isStreamingMode;
+        this.config = config;
     }
 
     public String getResultId() {
@@ -59,10 +62,14 @@ public class ResultDescriptor {
     }
 
     public boolean isTableauMode() {
-        return isTableauMode;
+        return config.get(EXECUTION_RESULT_MODE).equals(ResultMode.TABLEAU);
     }
 
     public boolean isStreamingMode() {
-        return isStreamingMode;
+        return config.get(RUNTIME_MODE).equals(RuntimeExecutionMode.STREAMING);
+    }
+
+    public int maxColumnWidth() {
+        return config.get(DISPLAY_MAX_COLUMN_WIDTH);
     }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index f8a1026..2306488 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -19,12 +19,10 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.table.client.config.ResultMode;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
@@ -52,9 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
-import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -241,11 +237,7 @@ public class LocalExecutor implements Executor {
         // store the result under the JobID
         resultStore.storeResult(jobId, result);
         return new ResultDescriptor(
-                jobId,
-                tableResult.getResolvedSchema(),
-                result.isMaterialized(),
-                config.get(EXECUTION_RESULT_MODE).equals(ResultMode.TABLEAU),
-                
config.get(RUNTIME_MODE).equals(RuntimeExecutionMode.STREAMING));
+                jobId, tableResult.getResolvedSchema(), 
result.isMaterialized(), config);
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
index eee91c8..02f78bd 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
@@ -51,7 +51,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /** Tests for {@link SqlClient}. */
 public class SqlClientTest {
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index d04d153..cfa90e8 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -75,9 +75,9 @@ import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
 import static 
org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY;
 import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /** Tests for the {@link CliClient}. */
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
index 45e1b67..eaa520c 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.client.cli;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.DataTypes;
@@ -24,6 +25,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.config.ResultMode;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
@@ -48,6 +50,8 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
 import static org.junit.Assert.assertTrue;
 
 /** Contains basic tests for the {@link CliResultView}. */
@@ -90,14 +94,16 @@ public class CliResultViewTest {
                 new CountDownLatch(expectedCancellationCount);
 
         final MockExecutor executor = new MockExecutor(typedResult, 
cancellationCounterLatch);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLE);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
         String sessionId = executor.openSession("test-session");
         final ResultDescriptor descriptor =
                 new ResultDescriptor(
                         "result-id",
                         ResolvedSchema.of(Column.physical("Null Field", 
DataTypes.STRING())),
                         false,
-                        false,
-                        true);
+                        testConfig);
 
         try (CliClient cli =
                 new TestingCliClient(
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
index 83d5302..9c69e48 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.table.client.cli;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.config.ResultMode;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
@@ -45,8 +48,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
 
 /** Tests for CliTableauResultView. */
 public class CliTableauResultViewTest {
@@ -156,7 +161,10 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testBatchResult() {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, false);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
 
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
@@ -206,7 +214,10 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testCancelBatchResult() throws Exception {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, false);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
 
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
@@ -246,7 +257,11 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testEmptyBatchResult() {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, false);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
+
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
                         .setResultChangesSupplier(TypedResult::endOfStream)
@@ -268,7 +283,10 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testFailedBatchResult() {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, false);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
 
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
@@ -295,7 +313,10 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testStreamingResult() {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, true);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
 
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
@@ -354,7 +375,10 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testEmptyStreamingResult() {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, true);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
 
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
@@ -382,7 +406,10 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testCancelStreamingResult() throws Exception {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, true);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
 
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
@@ -435,7 +462,10 @@ public class CliTableauResultViewTest {
 
     @Test
     public void testFailedStreamingResult() {
-        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, true, true);
+        final Configuration testConfig = new Configuration();
+        testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
+        testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+        ResultDescriptor resultDescriptor = new ResultDescriptor("", schema, 
true, testConfig);
 
         TestingExecutor mockExecutor =
                 new TestingExecutorBuilder()
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
index 2567986..11507b5 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
@@ -45,10 +45,10 @@ import static 
org.apache.flink.configuration.PipelineOptions.NAME;
 import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/select.q 
b/flink-table/flink-sql-client/src/test/resources/sql/select.q
index 9d875c3..55b722c 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/select.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/select.q
@@ -115,6 +115,74 @@ FROM (VALUES
 Received a total of 2 rows
 !ok
 
+# ==========================================================================
+# Testing behavior of sql-client.display.max-column-width
+# Only variable width columns are impacted at the moment => STRING, but not 
TIMESTAMP nor BOOLEAN
+# ==========================================================================
+
+CREATE TEMPORARY VIEW
+  testUserData(name, dob, isHappy)
+AS (VALUES
+  ('30b5c1bb-0ac0-43d3-b812-fcb649fd2b07', TIMESTAMP '2001-01-13 
20:11:11.123', true),
+  ('91170c98-2cc5-4935-9ea6-12b72d32fb3c', TIMESTAMP '1994-02-14 
21:12:11.123', true),
+  ('8b012d93-6ece-48ad-a2ea-aa75ef7b1d60', TIMESTAMP '1979-03-15 
22:13:11.123', false),
+  ('09969d9e-d584-11eb-b8bc-0242ac130003', TIMESTAMP '1985-04-16 
23:14:11.123', true)
+);
+[INFO] Execute statement succeed.
+!info
+
+SELECT * from testUserData;
++----+--------------------------------+-------------------------+---------+
+| op |                           name |                     dob | isHappy |
++----+--------------------------------+-------------------------+---------+
+| +I | 30b5c1bb-0ac0-43d3-b812-fcb... | 2001-01-13 20:11:11.123 |    true |
+| +I | 91170c98-2cc5-4935-9ea6-12b... | 1994-02-14 21:12:11.123 |    true |
+| +I | 8b012d93-6ece-48ad-a2ea-aa7... | 1979-03-15 22:13:11.123 |   false |
+| +I | 09969d9e-d584-11eb-b8bc-024... | 1985-04-16 23:14:11.123 |    true |
++----+--------------------------------+-------------------------+---------+
+Received a total of 4 rows
+!ok
+
+SET 'sql-client.display.max-column-width' = '10';
+[INFO] Session property has been set.
+!info
+
+SELECT * from testUserData;
++----+------------+-------------------------+---------+
+| op |       name |                     dob | isHappy |
++----+------------+-------------------------+---------+
+| +I | 30b5c1b... | 2001-01-13 20:11:11.123 |    true |
+| +I | 91170c9... | 1994-02-14 21:12:11.123 |    true |
+| +I | 8b012d9... | 1979-03-15 22:13:11.123 |   false |
+| +I | 09969d9... | 1985-04-16 23:14:11.123 |    true |
++----+------------+-------------------------+---------+
+Received a total of 4 rows
+!ok
+
+SET 'sql-client.display.max-column-width' = '40';
+[INFO] Session property has been set.
+!info
+
+SELECT * from testUserData;
++----+------------------------------------------+-------------------------+---------+
+| op |                                     name |                     dob | 
isHappy |
++----+------------------------------------------+-------------------------+---------+
+| +I |     30b5c1bb-0ac0-43d3-b812-fcb649fd2b07 | 2001-01-13 20:11:11.123 |    
true |
+| +I |     91170c98-2cc5-4935-9ea6-12b72d32fb3c | 1994-02-14 21:12:11.123 |    
true |
+| +I |     8b012d93-6ece-48ad-a2ea-aa75ef7b1d60 | 1979-03-15 22:13:11.123 |   
false |
+| +I |     09969d9e-d584-11eb-b8bc-0242ac130003 | 1985-04-16 23:14:11.123 |    
true |
++----+------------------------------------------+-------------------------+---------+
+Received a total of 4 rows
+!ok
+
+-- post-test cleanup + setting back default max width value
+DROP TEMPORARY VIEW testUserData;
+[INFO] Execute statement succeed.
+!info
+
+SET 'sql-client.display.max-column-width' = '30';
+[INFO] Session property has been set.
+!info
 
 # ==========================================================================
 # test batch query

Reply via email to