wuchong commented on a change in pull request #12728:
URL: https://github.com/apache/flink/pull/12728#discussion_r444005803
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -131,14 +131,27 @@
* it... // collect same data
* }
* }</pre>
+ *
+ * <p>This method guarantees end-to-end exactly-once record delivery
+ * which requires the checkpointing mechanism to be enabled.
+ * By default, checkpointing is disabled. To enable checkpointing,
+ * call `StreamExecutionEnvironment#enableCheckpointing()` method.
+ *
+ * <p>Only this method or {@link #print()} method can be called for a
TableResult instance,
+ * because the result can only be accessed once.
Review comment:
NOTE: In order to fetch result, you can call either {@link #print()} or
{@link #collect()}. But, they can't be called both on the same {@link
TableResult} instance, because the result can only be accessed once.
What do you think?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -131,14 +131,27 @@
* it... // collect same data
* }
* }</pre>
+ *
+ * <p>This method guarantees end-to-end exactly-once record delivery
+ * which requires the checkpointing mechanism to be enabled.
+ * By default, checkpointing is disabled. To enable checkpointing,
+ * call `StreamExecutionEnvironment#enableCheckpointing()` method.
+ *
+ * <p>Only this method or {@link #print()} method can be called for a
TableResult instance,
+ * because the result can only be accessed once.
*/
CloseableIterator<Row> collect();
/**
* Print the result contents as tableau form to client console.
*
- * <p><strong>NOTE:</strong> please make sure the result data to print
should be small.
- * Because all data will be collected to local first, and then print
them to console.
+ * <p>This method guarantees end-to-end exactly-once record delivery
+ * which requires the checkpointing mechanism to be enabled.
+ * By default, checkpointing is disabled. To enable checkpointing,
+ * call `StreamExecutionEnvironment#enableCheckpointing()` method.
Review comment:
What if it is called in batch mode? IIUC, this is only required in
streaming mode?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
##########
@@ -66,70 +77,102 @@ public static void printAsTableauForm(
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter) {
- printAsTableauForm(tableSchema, it, printWriter,
MAX_COLUMN_WIDTH, NULL_COLUMN, false);
+ printAsTableauForm(tableSchema, it, printWriter,
MAX_COLUMN_WIDTH, NULL_COLUMN, false, false);
}
/**
* Displays the result in a tableau form.
*
+ * <p><b>NOTE:</b> please make sure the data to print is small enough
to be stored in java heap memory
+ * if the column width is derived from content
(`deriveColumnWidthByType` is false).
+ *
* <p>For example: (printRowKind is true)
- * +-------------+-------------+---------+-------------+
+ * <pre>
+ * +----------+-------------+---------+-------------+
* | row_kind | boolean_col | int_col | varchar_col |
- * +-------------+-------------+---------+-------------+
+ * +----------+-------------+---------+-------------+
* | +I | true | 1 | abc |
* | -U | false | 2 | def |
* | +U | false | 3 | def |
* | -D | (NULL) | (NULL) | (NULL) |
- * +-------------+-------------+---------+-------------+
+ * +----------+-------------+---------+-------------+
* 4 rows in result
+ * </pre>
+ *
+ * @param tableSchema The schema of the data to print
+ * @param it The iterator for the data to print
+ * @param printWriter The writer to write to
+ * @param maxColumnWidth The max width of a column
+ * @param nullColumn The string representation of a null value
+ * @param deriveColumnWidthByType A flag to indicate whether the column
width
+ * is derived from type (true) or content (false).
+ * @param printRowKind A flag to indicate whether print row kind info
*/
public static void printAsTableauForm(
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter,
int maxColumnWidth,
String nullColumn,
+ boolean deriveColumnWidthByType,
boolean printRowKind) {
- List<String[]> rows = new ArrayList<>();
-
- // fill field names first
- final List<TableColumn> columns;
+ final List<TableColumn> columns = tableSchema.getTableColumns();
+ String[] columnNames =
columns.stream().map(TableColumn::getName).toArray(String[]::new);
if (printRowKind) {
- columns = Stream.concat(
- Stream.of(TableColumn.of("row_kind",
DataTypes.STRING())),
- tableSchema.getTableColumns().stream()
- ).collect(Collectors.toList());
- } else {
- columns = tableSchema.getTableColumns();
+ columnNames = Stream.concat(Stream.of(ROW_KIND_COLUMN),
Arrays.stream(columnNames)).toArray(String[]::new);
}
-
rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
- while (it.hasNext()) {
- rows.add(rowToString(it.next(), nullColumn,
printRowKind));
+ final int[] colWidths;
+ if (deriveColumnWidthByType) {
+ colWidths = columnWidthsByType(
+ columns,
+ maxColumnWidth,
+ nullColumn,
+ printRowKind ? ROW_KIND_COLUMN : null);
+ } else {
+ //
Review comment:
remove?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -705,7 +705,7 @@ public TableResult executeInternal(QueryOperation
operation) {
.tableSchema(operation.getTableSchema())
.data(resultProvider.getResultIterator())
.setPrintStyle(TableResultImpl.PrintStyle.tableau(
-
PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, isStreamingMode))
+
PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, true, isStreamingMode))
Review comment:
Can we use `isStreamingMode` for the `deriveColumnWidthByType` instead
of hard code to `true`?
In my understanding, we can derive width by content if it is batch mode.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]