This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 326a564 [FLINK-22169][sql-client] Improve CliTableauResultView when printing batch results (#15603) 326a564 is described below commit 326a564ebc0e397654ccd4e5a83a77ef811c6a76 Author: Shengkai <33114724+fsk...@users.noreply.github.com> AuthorDate: Thu Apr 15 13:51:23 2021 +0800 [FLINK-22169][sql-client] Improve CliTableauResultView when printing batch results (#15603) --- .../table/client/cli/CliTableauResultView.java | 80 +++++++++++++++------- .../table/client/cli/CliTableauResultViewTest.java | 30 +------- .../src/test/resources/sql/catalog_database.q | 5 +- .../src/test/resources/sql/insert.q | 24 +++---- .../src/test/resources/sql/select.q | 14 ++-- .../src/test/resources/sql/statement_set.q | 24 +++---- 6 files changed, 89 insertions(+), 88 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java index aae6afc..df84801 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java @@ -29,6 +29,7 @@ import org.apache.flink.types.Row; import org.jline.terminal.Terminal; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -66,7 +67,11 @@ public class CliTableauResultView implements AutoCloseable { Future<?> resultFuture = displayResultExecutorService.submit( () -> { - printResults(receivedRowCount, resultDescriptor.isStreamingMode()); + if (resultDescriptor.isStreamingMode()) { + printStreamingResults(receivedRowCount); + } else { + printBatchResults(receivedRowCount); + } }); // capture CTRL-C @@ -85,7 +90,8 @@ public class CliTableauResultView implements AutoCloseable { .println( "Query terminated, received a total of " + receivedRowCount.get() - + " rows"); + + " " + + getRowTerm(receivedRowCount)); terminal.flush(); } catch (ExecutionException e) { if (e.getCause() instanceof SqlExecutionException) { @@ -114,28 +120,26 @@ public class CliTableauResultView implements AutoCloseable { } } - private void printResults(AtomicInteger receivedRowCount, boolean isStreamingMode) { + private void printBatchResults(AtomicInteger receivedRowCount) { + final List<Row> resultRows = waitBatchResults(); + receivedRowCount.addAndGet(resultRows.size()); + PrintUtils.printAsTableauForm( + resultDescriptor.getResultSchema(), resultRows.iterator(), terminal.writer()); + } + + private void printStreamingResults(AtomicInteger receivedRowCount) { List<Column> columns = resultDescriptor.getResultSchema().getColumns(); - final String[] fieldNames; - final int[] colWidths; - if (isStreamingMode) { - fieldNames = - Stream.concat( - Stream.of(PrintUtils.ROW_KIND_COLUMN), - columns.stream().map(Column::getName)) - .toArray(String[]::new); - colWidths = - PrintUtils.columnWidthsByType( - columns, - PrintUtils.MAX_COLUMN_WIDTH, - PrintUtils.NULL_COLUMN, - PrintUtils.ROW_KIND_COLUMN); - } else { - fieldNames = columns.stream().map(Column::getName).toArray(String[]::new); - colWidths = - PrintUtils.columnWidthsByType( - columns, PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, null); - } + final String[] fieldNames = + Stream.concat( + Stream.of(PrintUtils.ROW_KIND_COLUMN), + columns.stream().map(Column::getName)) + .toArray(String[]::new); + final int[] colWidths = + PrintUtils.columnWidthsByType( + columns, + PrintUtils.MAX_COLUMN_WIDTH, + PrintUtils.NULL_COLUMN, + PrintUtils.ROW_KIND_COLUMN); String borderline = PrintUtils.genBorderLine(colWidths); @@ -162,7 +166,7 @@ public class CliTableauResultView implements AutoCloseable { if (receivedRowCount.get() > 0) { terminal.writer().println(borderline); } - String rowTerm = receivedRowCount.get() > 1 ? "rows" : "row"; + String rowTerm = getRowTerm(receivedRowCount); terminal.writer() .println( "Received a total of " @@ -175,8 +179,7 @@ public class CliTableauResultView implements AutoCloseable { List<Row> changes = result.getPayload(); for (Row change : changes) { final String[] row = - PrintUtils.rowToString( - change, PrintUtils.NULL_COLUMN, isStreamingMode); + PrintUtils.rowToString(change, PrintUtils.NULL_COLUMN, true); PrintUtils.printSingleRow(colWidths, row, terminal.writer()); receivedRowCount.incrementAndGet(); } @@ -186,4 +189,29 @@ public class CliTableauResultView implements AutoCloseable { } } } + + private List<Row> waitBatchResults() { + List<Row> resultRows = new ArrayList<>(); + do { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + TypedResult<List<Row>> result = + sqlExecutor.retrieveResultChanges(sessionId, resultDescriptor.getResultId()); + + if (result.getType() == TypedResult.ResultType.EOS) { + break; + } else if (result.getType() == TypedResult.ResultType.PAYLOAD) { + resultRows.addAll(result.getPayload()); + } + } while (true); + + return resultRows; + } + + private String getRowTerm(AtomicInteger receivedRowCount) { + return receivedRowCount.get() > 1 ? "rows" : "row"; + } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java index d882211..2f06b07 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java @@ -198,7 +198,7 @@ public class CliTableauResultViewTest { + System.lineSeparator() + "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" + System.lineSeparator() - + "Received a total of 8 rows" + + "8 rows in set" + System.lineSeparator(), terminalOutput.toString()); assertThat(mockExecutor.getNumCancelCalls(), is(0)); @@ -233,22 +233,7 @@ public class CliTableauResultViewTest { furture.get(5, TimeUnit.SECONDS); Assert.assertEquals( - "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" - + System.lineSeparator() - + "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" - + System.lineSeparator() - + "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" - + System.lineSeparator() - + "| (NULL) | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |" - + System.lineSeparator() - + "| false | (NULL) | 0 | | 1 | 2020-03-01 18:39:14.1 |" - + System.lineSeparator() - + "| true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" - + System.lineSeparator() - + "| false | -2147483648 | 9223372036854775807 | (NULL) | 12345.06789 | 2020-03-01 18:39:14.123 |" - + System.lineSeparator() - + "Query terminated, received a total of 4 rows" - + System.lineSeparator(), + "Query terminated, received a total of 0 row" + System.lineSeparator(), terminalOutput.toString()); // didn't have a chance to read page @@ -277,16 +262,7 @@ public class CliTableauResultViewTest { view.displayResults(); view.close(); - Assert.assertEquals( - "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" - + System.lineSeparator() - + "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" - + System.lineSeparator() - + "+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+" - + System.lineSeparator() - + "Received a total of 0 row" - + System.lineSeparator(), - terminalOutput.toString()); + Assert.assertEquals("Empty set" + System.lineSeparator(), terminalOutput.toString()); assertThat(mockExecutor.getNumCancelCalls(), is(0)); } diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index f15ead4..2503113 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -276,10 +276,7 @@ SET sql-client.execution.result-mode = tableau; # test the SELECT query can run successfully, even result is empty select * from hivecatalog.`default`.param_types_table; -+--------------+--------------------------------+--------------------------------+ -| dec | ch | vch | -+--------------+--------------------------------+--------------------------------+ -Received a total of 0 row +Empty set !ok # ========================================================================== diff --git a/flink-table/flink-sql-client/src/test/resources/sql/insert.q b/flink-table/flink-sql-client/src/test/resources/sql/insert.q index 3fde8f1..39cef38 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/insert.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/insert.q @@ -89,16 +89,16 @@ INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, !info SELECT * FROM BatchTable; -+-------------+--------------------------------+ -| id | str | -+-------------+--------------------------------+ -| 1 | Hello World | -| 2 | Hi | -| 2 | Hi | -| 3 | Hello | -| 3 | World | -| 4 | ADD | -| 5 | LINE | -+-------------+--------------------------------+ -Received a total of 7 rows ++----+-------------+ +| id | str | ++----+-------------+ +| 1 | Hello World | +| 2 | Hi | +| 2 | Hi | +| 3 | Hello | +| 3 | World | +| 4 | ADD | +| 5 | LINE | ++----+-------------+ +7 rows in set !ok diff --git a/flink-table/flink-sql-client/src/test/resources/sql/select.q b/flink-table/flink-sql-client/src/test/resources/sql/select.q index d98da35..3cf5dc6 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/select.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/select.q @@ -78,11 +78,11 @@ SET execution.runtime-mode = batch; SELECT id, COUNT(*) as cnt, COUNT(DISTINCT str) as uv FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi')) as T(id, str) GROUP BY id; -+-------------+----------------------+----------------------+ -| id | cnt | uv | -+-------------+----------------------+----------------------+ -| 1 | 1 | 1 | -| 2 | 2 | 1 | -+-------------+----------------------+----------------------+ -Received a total of 2 rows ++----+-----+----+ +| id | cnt | uv | ++----+-----+----+ +| 1 | 1 | 1 | +| 2 | 2 | 1 | ++----+-----+----+ +2 rows in set !ok diff --git a/flink-table/flink-sql-client/src/test/resources/sql/statement_set.q b/flink-table/flink-sql-client/src/test/resources/sql/statement_set.q index 8c27dd0..f8b3e6f 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/statement_set.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/statement_set.q @@ -144,18 +144,18 @@ END; !info SELECT * FROM BatchTable; -+-------------+--------------------------------+ -| id | str | -+-------------+--------------------------------+ -| 1 | Hello World | -| 2 | Hi | -| 2 | Hi | -| 3 | Hello | -| 3 | World | -| 4 | ADD | -| 5 | LINE | -+-------------+--------------------------------+ -Received a total of 7 rows ++----+-------------+ +| id | str | ++----+-------------+ +| 1 | Hello World | +| 2 | Hi | +| 2 | Hi | +| 3 | Hello | +| 3 | World | +| 4 | ADD | +| 5 | LINE | ++----+-------------+ +7 rows in set !ok BEGIN STATEMENT SET;