twalthr commented on a change in pull request #12728:
URL: https://github.com/apache/flink/pull/12728#discussion_r444016557



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
##########
@@ -66,70 +77,102 @@ public static void printAsTableauForm(
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter) {
-               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH, NULL_COLUMN, false);
+               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH, NULL_COLUMN, false, false);
        }
 
        /**
         * Displays the result in a tableau form.
         *
+        * <p><b>NOTE:</b> please make sure the data to print is small enough 
to be stored in java heap memory
+        * if the column width is derived from content 
(`deriveColumnWidthByType` is false).
+        *
         * <p>For example: (printRowKind is true)
-        * +-------------+-------------+---------+-------------+
+        * <pre>
+        * +----------+-------------+---------+-------------+
         * | row_kind | boolean_col | int_col | varchar_col |
-        * +-------------+-------------+---------+-------------+
+        * +----------+-------------+---------+-------------+
         * |       +I |        true |       1 |         abc |
         * |       -U |       false |       2 |         def |
         * |       +U |       false |       3 |         def |
         * |       -D |      (NULL) |  (NULL) |      (NULL) |
-        * +-------------+-------------+---------+-------------+
+        * +----------+-------------+---------+-------------+
         * 4 rows in result
+        * </pre>
+        *
+        * @param tableSchema The schema of the data to print
+        * @param it The iterator for the data to print
+        * @param printWriter The writer to write to
+        * @param maxColumnWidth The max width of a column
+        * @param nullColumn The string representation of a null value
+        * @param deriveColumnWidthByType A flag to indicate whether the column 
width
+        *        is derived from type (true) or content (false).
+        * @param printRowKind A flag to indicate whether print row kind info
         */
        public static void printAsTableauForm(
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter,
                        int maxColumnWidth,
                        String nullColumn,
+                       boolean deriveColumnWidthByType,
                        boolean printRowKind) {
-               List<String[]> rows = new ArrayList<>();
-
-               // fill field names first
-               final List<TableColumn> columns;
+               final List<TableColumn> columns = tableSchema.getTableColumns();
+               String[] columnNames = 
columns.stream().map(TableColumn::getName).toArray(String[]::new);
                if (printRowKind) {
-                       columns = Stream.concat(
-                                       Stream.of(TableColumn.of("row_kind", 
DataTypes.STRING())),
-                                       tableSchema.getTableColumns().stream()
-                       ).collect(Collectors.toList());
-               } else {
-                       columns = tableSchema.getTableColumns();
+                       columnNames = Stream.concat(Stream.of(ROW_KIND_COLUMN), 
Arrays.stream(columnNames)).toArray(String[]::new);
                }
 
-               
rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
-               while (it.hasNext()) {
-                       rows.add(rowToString(it.next(), nullColumn, 
printRowKind));
+               final int[] colWidths;
+               if (deriveColumnWidthByType) {
+                       colWidths = columnWidthsByType(
+                                       columns,
+                                       maxColumnWidth,
+                                       nullColumn,
+                                       printRowKind ? ROW_KIND_COLUMN : null);
+               } else {
+                       //

Review comment:
       empty comment

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -131,14 +131,17 @@
         *      it... // collect same data
         *  }
         * }</pre>
+        *
+        * <p>Only this method or {@link #print()} method can be called for a 
TableResult instance,
+        * because the result can only be accessed once.
         */
        CloseableIterator<Row> collect();
 
        /**
         * Print the result contents as tableau form to client console.
         *
-        * <p><strong>NOTE:</strong> please make sure the result data to print 
should be small.
-        * Because all data will be collected to local first, and then print 
them to console.
+        * <p>only this method or {@link #collect()} ()} method can be called 
for a TableResult instance,

Review comment:
       invalid parenthesis in JavaDoc

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
##########
@@ -145,28 +168,55 @@ public void testPrintWithMultipleRowsAndRowKind() {
                                new PrintWriter(outContent),
                                PrintUtils.MAX_COLUMN_WIDTH,
                                "",
+                               true, // derive column width by type
                                true);
 
                // note: the expected result may look irregular because every 
CJK(Chinese/Japanese/Korean) character's
                // width < 2 in IDE by default, every CJK character usually's 
width is 2, you can open this source file
                // by vim or just cat the file to check the regular result.
+               // The last row of `varchar` value will pad with two ' ' before 
the column.
+               // Because the length of `これは日本語をテストするた` plus the length of 
`...` is 29,
+               // no more Japanese character can be added to the line.
                assertEquals(
-                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n"
 +
-                               "| row_kind | boolean |         int |           
    bigint |                        varchar | decimal(10, 5) |                  
timestamp |\n" +
-                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n"
 +
-                               "|       +I |         |           1 |           
         2 |                            abc |           1.23 |      2020-03-01 
18:39:14.0 |\n" +
-                               "|       +I |   false |             |           
         0 |                                |              1 |      2020-03-01 
18:39:14.1 |\n" +
-                               "|       -D |    true |  2147483647 |           
           |                        abcdefg |     1234567890 |     2020-03-01 
18:39:14.12 |\n" +
-                               "|       +I |   false | -2147483648 |  
9223372036854775807 |                                |    12345.06789 |    
2020-03-01 18:39:14.123 |\n" +
-                               "|       +I |    true |         100 | 
-9223372036854775808 |                     abcdefg111 |                | 
2020-03-01 18:39:14.123456 |\n" +
-                               "|       -U |         |          -1 |           
        -1 |     abcdefghijklmnopqrstuvwxyz |   -12345.06789 |                  
          |\n" +
-                               "|       +U |         |          -1 |           
        -1 |                   这是一段中文 |   -12345.06789 |      2020-03-04 
18:39:14.0 |\n" +
-                               "|       -D |         |          -1 |           
        -1 |  これは日本語をテストするた... |   -12345.06789 |      2020-03-04 18:39:14.0 
|\n" +
-                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n"
 +
+                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+"
 + System.lineSeparator() +
+                               "| row_kind | boolean |         int |           
    bigint |                        varchar | decimal(10, 5) |                  
timestamp |" + System.lineSeparator() +
+                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+"
 + System.lineSeparator() +
+                               "|       +I |         |           1 |           
         2 |                            abc |           1.23 |      2020-03-01 
18:39:14.0 |" + System.lineSeparator() +
+                               "|       +I |   false |             |           
         0 |                                |              1 |      2020-03-01 
18:39:14.1 |" + System.lineSeparator() +
+                               "|       -D |    true |  2147483647 |           
           |                        abcdefg |     1234567890 |     2020-03-01 
18:39:14.12 |" + System.lineSeparator() +
+                               "|       +I |   false | -2147483648 |  
9223372036854775807 |                                |    12345.06789 |    
2020-03-01 18:39:14.123 |" + System.lineSeparator() +
+                               "|       +I |    true |         100 | 
-9223372036854775808 |                     abcdefg111 |                | 
2020-03-01 18:39:14.123456 |" + System.lineSeparator() +
+                               "|       -U |         |          -1 |           
        -1 | abcdefghijklmnopqrstuvwxyza... |   -12345.06789 |                  
          |" + System.lineSeparator() +
+                               "|       +U |         |          -1 |           
        -1 |                   这是一段中文 |   -12345.06789 |      2020-03-04 
18:39:14.0 |" + System.lineSeparator() +
+                               "|       -D |         |          -1 |           
        -1 |  これは日本語をテストするた... |   -12345.06789 |      2020-03-04 18:39:14.0 |" 
+ System.lineSeparator() +
+                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+"
 + System.lineSeparator() +
                                "8 rows in set" + System.lineSeparator(),
                                outContent.toString());
        }
 
+       @Test
+       public void testPrintWithMultipleRowsAndDriverColumnWidthByContent() {

Review comment:
       nit: Driver?

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java
##########
@@ -145,28 +168,55 @@ public void testPrintWithMultipleRowsAndRowKind() {
                                new PrintWriter(outContent),
                                PrintUtils.MAX_COLUMN_WIDTH,
                                "",
+                               true, // derive column width by type
                                true);
 
                // note: the expected result may look irregular because every 
CJK(Chinese/Japanese/Korean) character's
                // width < 2 in IDE by default, every CJK character usually's 
width is 2, you can open this source file
                // by vim or just cat the file to check the regular result.
+               // The last row of `varchar` value will pad with two ' ' before 
the column.
+               // Because the length of `これは日本語をテストするた` plus the length of 
`...` is 29,
+               // no more Japanese character can be added to the line.
                assertEquals(
-                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n"
 +
-                               "| row_kind | boolean |         int |           
    bigint |                        varchar | decimal(10, 5) |                  
timestamp |\n" +
-                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n"
 +
-                               "|       +I |         |           1 |           
         2 |                            abc |           1.23 |      2020-03-01 
18:39:14.0 |\n" +
-                               "|       +I |   false |             |           
         0 |                                |              1 |      2020-03-01 
18:39:14.1 |\n" +
-                               "|       -D |    true |  2147483647 |           
           |                        abcdefg |     1234567890 |     2020-03-01 
18:39:14.12 |\n" +
-                               "|       +I |   false | -2147483648 |  
9223372036854775807 |                                |    12345.06789 |    
2020-03-01 18:39:14.123 |\n" +
-                               "|       +I |    true |         100 | 
-9223372036854775808 |                     abcdefg111 |                | 
2020-03-01 18:39:14.123456 |\n" +
-                               "|       -U |         |          -1 |           
        -1 |     abcdefghijklmnopqrstuvwxyz |   -12345.06789 |                  
          |\n" +
-                               "|       +U |         |          -1 |           
        -1 |                   这是一段中文 |   -12345.06789 |      2020-03-04 
18:39:14.0 |\n" +
-                               "|       -D |         |          -1 |           
        -1 |  これは日本語をテストするた... |   -12345.06789 |      2020-03-04 18:39:14.0 
|\n" +
-                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n"
 +
+                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+"
 + System.lineSeparator() +
+                               "| row_kind | boolean |         int |           
    bigint |                        varchar | decimal(10, 5) |                  
timestamp |" + System.lineSeparator() +
+                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+"
 + System.lineSeparator() +
+                               "|       +I |         |           1 |           
         2 |                            abc |           1.23 |      2020-03-01 
18:39:14.0 |" + System.lineSeparator() +
+                               "|       +I |   false |             |           
         0 |                                |              1 |      2020-03-01 
18:39:14.1 |" + System.lineSeparator() +
+                               "|       -D |    true |  2147483647 |           
           |                        abcdefg |     1234567890 |     2020-03-01 
18:39:14.12 |" + System.lineSeparator() +
+                               "|       +I |   false | -2147483648 |  
9223372036854775807 |                                |    12345.06789 |    
2020-03-01 18:39:14.123 |" + System.lineSeparator() +
+                               "|       +I |    true |         100 | 
-9223372036854775808 |                     abcdefg111 |                | 
2020-03-01 18:39:14.123456 |" + System.lineSeparator() +
+                               "|       -U |         |          -1 |           
        -1 | abcdefghijklmnopqrstuvwxyza... |   -12345.06789 |                  
          |" + System.lineSeparator() +
+                               "|       +U |         |          -1 |           
        -1 |                   这是一段中文 |   -12345.06789 |      2020-03-04 
18:39:14.0 |" + System.lineSeparator() +
+                               "|       -D |         |          -1 |           
        -1 |  これは日本語をテストするた... |   -12345.06789 |      2020-03-04 18:39:14.0 |" 
+ System.lineSeparator() +
+                               
"+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+"
 + System.lineSeparator() +
                                "8 rows in set" + System.lineSeparator(),
                                outContent.toString());
        }
 
+       @Test
+       public void testPrintWithMultipleRowsAndDriverColumnWidthByContent() {
+               PrintUtils.printAsTableauForm(
+                               getSchema(),
+                               getData().subList(0, 3).iterator(),
+                               new PrintWriter(outContent),
+                               PrintUtils.MAX_COLUMN_WIDTH,
+                               "",
+                               false, // derive column width by content
+                               true);
+
+               assertEquals(
+                               
"+----------+---------+------------+--------+---------+----------------+------------------------+"
 + System.lineSeparator() +
+                               "| row_kind | boolean |        int | bigint | 
varchar | decimal(10, 5) |              timestamp |" + System.lineSeparator() +

Review comment:
       how about we make this column shorter and leave more room for the 
remaining column, just `kind`?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -131,14 +131,27 @@
         *      it... // collect same data
         *  }
         * }</pre>
+        *
+        * <p>This method guarantees end-to-end exactly-once record delivery
+        * which requires the checkpointing mechanism to be enabled.
+        * By default, checkpointing is disabled. To enable checkpointing,

Review comment:
       Remove
   ```
   By default, checkpointing is disabled. To enable checkpointing, call 
`StreamExecutionEnvironment#enableCheckpointing()` method.
   ```
   because `StreamExecutionEnvironment` is not available for pure Table API 
users.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
##########
@@ -66,70 +77,102 @@ public static void printAsTableauForm(
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter) {
-               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH, NULL_COLUMN, false);
+               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH, NULL_COLUMN, false, false);
        }
 
        /**
         * Displays the result in a tableau form.
         *
+        * <p><b>NOTE:</b> please make sure the data to print is small enough 
to be stored in java heap memory
+        * if the column width is derived from content 
(`deriveColumnWidthByType` is false).
+        *
         * <p>For example: (printRowKind is true)
-        * +-------------+-------------+---------+-------------+
+        * <pre>
+        * +----------+-------------+---------+-------------+
         * | row_kind | boolean_col | int_col | varchar_col |
-        * +-------------+-------------+---------+-------------+
+        * +----------+-------------+---------+-------------+
         * |       +I |        true |       1 |         abc |
         * |       -U |       false |       2 |         def |
         * |       +U |       false |       3 |         def |
         * |       -D |      (NULL) |  (NULL) |      (NULL) |
-        * +-------------+-------------+---------+-------------+
+        * +----------+-------------+---------+-------------+
         * 4 rows in result
+        * </pre>
+        *
+        * @param tableSchema The schema of the data to print
+        * @param it The iterator for the data to print
+        * @param printWriter The writer to write to
+        * @param maxColumnWidth The max width of a column
+        * @param nullColumn The string representation of a null value
+        * @param deriveColumnWidthByType A flag to indicate whether the column 
width
+        *        is derived from type (true) or content (false).
+        * @param printRowKind A flag to indicate whether print row kind info
         */
        public static void printAsTableauForm(
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter,
                        int maxColumnWidth,
                        String nullColumn,
+                       boolean deriveColumnWidthByType,
                        boolean printRowKind) {
-               List<String[]> rows = new ArrayList<>();
-
-               // fill field names first
-               final List<TableColumn> columns;
+               final List<TableColumn> columns = tableSchema.getTableColumns();
+               String[] columnNames = 
columns.stream().map(TableColumn::getName).toArray(String[]::new);
                if (printRowKind) {
-                       columns = Stream.concat(
-                                       Stream.of(TableColumn.of("row_kind", 
DataTypes.STRING())),
-                                       tableSchema.getTableColumns().stream()
-                       ).collect(Collectors.toList());
-               } else {
-                       columns = tableSchema.getTableColumns();
+                       columnNames = Stream.concat(Stream.of(ROW_KIND_COLUMN), 
Arrays.stream(columnNames)).toArray(String[]::new);
                }
 
-               
rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
-               while (it.hasNext()) {
-                       rows.add(rowToString(it.next(), nullColumn, 
printRowKind));
+               final int[] colWidths;
+               if (deriveColumnWidthByType) {
+                       colWidths = columnWidthsByType(
+                                       columns,
+                                       maxColumnWidth,
+                                       nullColumn,
+                                       printRowKind ? ROW_KIND_COLUMN : null);
+               } else {
+                       //
+                       List<Row> rows = new ArrayList<>();
+                       List<String[]> content = new ArrayList<>();
+                       content.add(columnNames);
+                       while (it.hasNext()) {
+                               Row row = it.next();
+                               rows.add(row);
+                               content.add(rowToString(row, nullColumn, 
printRowKind));
+                       }
+                       colWidths = columnWidthsByContent(columnNames, content, 
maxColumnWidth);
+                       it = rows.iterator();
                }
 
-               int[] colWidths = columnWidthsByContent(columns, rows, 
maxColumnWidth);
-               String borderline = genBorderLine(colWidths);
-
-               // print field names
+               final String borderline = PrintUtils.genBorderLine(colWidths);
+               // print border line
                printWriter.println(borderline);
-               printSingleRow(colWidths, rows.get(0), printWriter);
+               // print filed names

Review comment:
       nit: `field`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
##########
@@ -66,70 +77,102 @@ public static void printAsTableauForm(
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter) {
-               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH, NULL_COLUMN, false);
+               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH, NULL_COLUMN, false, false);
        }
 
        /**
         * Displays the result in a tableau form.
         *
+        * <p><b>NOTE:</b> please make sure the data to print is small enough 
to be stored in java heap memory
+        * if the column width is derived from content 
(`deriveColumnWidthByType` is false).
+        *
         * <p>For example: (printRowKind is true)
-        * +-------------+-------------+---------+-------------+
+        * <pre>
+        * +----------+-------------+---------+-------------+
         * | row_kind | boolean_col | int_col | varchar_col |
-        * +-------------+-------------+---------+-------------+
+        * +----------+-------------+---------+-------------+
         * |       +I |        true |       1 |         abc |
         * |       -U |       false |       2 |         def |
         * |       +U |       false |       3 |         def |
         * |       -D |      (NULL) |  (NULL) |      (NULL) |
-        * +-------------+-------------+---------+-------------+
+        * +----------+-------------+---------+-------------+
         * 4 rows in result
+        * </pre>
+        *
+        * @param tableSchema The schema of the data to print
+        * @param it The iterator for the data to print
+        * @param printWriter The writer to write to
+        * @param maxColumnWidth The max width of a column
+        * @param nullColumn The string representation of a null value
+        * @param deriveColumnWidthByType A flag to indicate whether the column 
width
+        *        is derived from type (true) or content (false).
+        * @param printRowKind A flag to indicate whether print row kind info
         */
        public static void printAsTableauForm(
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter,
                        int maxColumnWidth,
                        String nullColumn,
+                       boolean deriveColumnWidthByType,
                        boolean printRowKind) {
-               List<String[]> rows = new ArrayList<>();
-
-               // fill field names first
-               final List<TableColumn> columns;
+               final List<TableColumn> columns = tableSchema.getTableColumns();
+               String[] columnNames = 
columns.stream().map(TableColumn::getName).toArray(String[]::new);
                if (printRowKind) {
-                       columns = Stream.concat(
-                                       Stream.of(TableColumn.of("row_kind", 
DataTypes.STRING())),
-                                       tableSchema.getTableColumns().stream()
-                       ).collect(Collectors.toList());
-               } else {
-                       columns = tableSchema.getTableColumns();
+                       columnNames = Stream.concat(Stream.of(ROW_KIND_COLUMN), 
Arrays.stream(columnNames)).toArray(String[]::new);
                }
 
-               
rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
-               while (it.hasNext()) {
-                       rows.add(rowToString(it.next(), nullColumn, 
printRowKind));
+               final int[] colWidths;
+               if (deriveColumnWidthByType) {
+                       colWidths = columnWidthsByType(
+                                       columns,
+                                       maxColumnWidth,
+                                       nullColumn,
+                                       printRowKind ? ROW_KIND_COLUMN : null);
+               } else {
+                       //
+                       List<Row> rows = new ArrayList<>();

Review comment:
       you are using `final` above but here and below you are not using it 
anymore, a reader now thinks that the variable is modified but this is not the 
case. It would be great to use `final` consistently.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
##########
@@ -176,14 +229,101 @@ public static void printAsTableauForm(
                return colWidths;
        }
 
-       public static String genBorderLine(int[] colWidths) {
-               StringBuilder sb = new StringBuilder();
-               sb.append("+");
-               for (int width : colWidths) {
-                       sb.append(EncodingUtils.repeat('-', width + 1));
-                       sb.append("-+");
+       /**
+        * Try to infer column width based on column types. In streaming case, 
we will have an

Review comment:
       we try to avoid the terminology `streaming` and `batch` in the code 
base, instead we call it `unbounded` or `bounded`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to