JingsongLi commented on a change in pull request #11273: 
[FLINK-12814][sql-client] Support tableau result format
URL: https://github.com/apache/flink/pull/11273#discussion_r386107090
 
 

 ##########
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
 ##########
 @@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.ResultDescriptor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.lang3.StringUtils;
+import org.jline.terminal.Terminal;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.client.cli.CliUtils.rowToString;
+
+/**
+ * Print result in tableau mode.
+ */
+public class CliTableauResultView implements Closeable {
+
+       private static final int NULL_COLUMN_WIDTH = 
CliStrings.NULL_COLUMN.length();
+       private static final int MAX_COLUMN_WIDTH = 30;
+       private static final int DEFAULT_COLUMN_WIDTH = 20;
+       private static final String COLUMN_TRUNCATED_FLAG = "...";
+       private static final String CHANGEFLAG_COLUMN_NAME = "+/-";
+
+       private final Terminal terminal;
+       private final Executor sqlExecutor;
+       private final String sessionId;
+       private final ResultDescriptor resultDescriptor;
+       private final ExecutorService displayResultExecutorService;
+
+       private volatile boolean cleanUpQuery;
+
+       public CliTableauResultView(
+                       final Terminal terminal,
+                       final Executor sqlExecutor,
+                       final String sessionId,
+                       final ResultDescriptor resultDescriptor) {
+               this.terminal = terminal;
+               this.sqlExecutor = sqlExecutor;
+               this.sessionId = sessionId;
+               this.resultDescriptor = resultDescriptor;
+               this.displayResultExecutorService = 
Executors.newSingleThreadExecutor();
+       }
+
+       public void displayStreamResults() throws SqlExecutionException {
+               final AtomicInteger receivedRowCount = new AtomicInteger(0);
+               Future<?> resultFuture = displayResultExecutorService.submit(() 
-> {
+                       printStreamResults(receivedRowCount);
+               });
+
+               // capture CTRL-C
+               terminal.handle(Terminal.Signal.INT, signal -> {
+                       resultFuture.cancel(true);
+               });
+
+               cleanUpQuery = true;
+               try {
+                       resultFuture.get();
+                       cleanUpQuery = false; // job finished successfully
+               } catch (CancellationException e) {
+                       terminal.writer().println("Query terminated, received a 
total of " + receivedRowCount.get() + " rows");
+                       terminal.flush();
+               } catch (ExecutionException e) {
+                       if (e.getCause() instanceof SqlExecutionException) {
+                               throw (SqlExecutionException) e.getCause();
+                       }
+                       throw new SqlExecutionException("unknown exception", 
e.getCause());
+               } catch (InterruptedException e) {
+                       throw new SqlExecutionException("Query interrupted", e);
+               } finally {
+                       checkAndCleanUpQuery();
+               }
+       }
+
+       public void displayBatchResults() throws SqlExecutionException {
+               Future<?> resultFuture = displayResultExecutorService.submit(() 
-> {
+                       final List<Row> resultRows = waitBatchResults();
+                       printBatchResults(resultRows);
+               });
+
+               // capture CTRL-C
+               terminal.handle(Terminal.Signal.INT, signal -> {
+                       resultFuture.cancel(true);
+               });
+
+               cleanUpQuery = true;
+               try {
+                       resultFuture.get();
+                       cleanUpQuery = false; // job finished successfully
+               } catch (CancellationException e) {
+                       terminal.writer().println("Query terminated");
+                       terminal.flush();
+               } catch (ExecutionException e) {
+                       if (e.getCause() instanceof SqlExecutionException) {
+                               throw (SqlExecutionException) e.getCause();
+                       }
+                       throw new SqlExecutionException("unknown exception", 
e.getCause());
+               } catch (InterruptedException e) {
+                       throw new SqlExecutionException("Query interrupted", e);
+               } finally {
+                       checkAndCleanUpQuery();
+               }
+       }
+
+       @Override
+       public void close() {
+               this.displayResultExecutorService.shutdown();
+       }
+
+       private void checkAndCleanUpQuery() {
+               if (cleanUpQuery) {
+                       try {
+                               sqlExecutor.cancelQuery(sessionId, 
resultDescriptor.getResultId());
+                       } catch (SqlExecutionException e) {
+                               // ignore further exceptions
+                       }
+               }
+       }
+
+       private List<Row> waitBatchResults() {
+               List<Row> resultRows;
+               // take snapshot and make all results in one page
+               do {
+                       try {
+                               Thread.sleep(50);
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       }
+                       TypedResult<Integer> result = 
sqlExecutor.snapshotResult(
+                                       sessionId,
+                                       resultDescriptor.getResultId(),
+                                       Integer.MAX_VALUE);
+
+                       if (result.getType() == TypedResult.ResultType.EOS) {
+                               resultRows = Collections.emptyList();
+                               break;
+                       } else if (result.getType() == 
TypedResult.ResultType.PAYLOAD) {
+                               resultRows = 
sqlExecutor.retrieveResultPage(resultDescriptor.getResultId(), 1);
+                               break;
+                       } else {
+                               // result not retrieved yet
+                       }
+               } while (true);
+
+               return resultRows;
+       }
+
+       private void printStreamResults(AtomicInteger receivedRowCount) {
+               List<TableColumn> columns = 
resultDescriptor.getResultSchema().getTableColumns();
+               String[] fieldNames =
+                               Stream.concat(
+                                               Stream.of("+/-"),
+                                               
columns.stream().map(TableColumn::getName)
+                               ).toArray(String[]::new);
+
+               int[] colWidths = columnWidthsByType(columns, true);
+               String borderline = genBorderLine(colWidths);
+
+               // print filed names
+               terminal.writer().println(borderline);
+               printSingleRow(colWidths, fieldNames);
+               terminal.writer().println(borderline);
+               terminal.flush();
+
+               while (true) {
+                       final TypedResult<List<Tuple2<Boolean, Row>>> result =
+                                       
sqlExecutor.retrieveResultChanges(sessionId, resultDescriptor.getResultId());
+
+                       switch (result.getType()) {
+                               case EMPTY:
+                                       // do nothing
+                                       break;
+                               case EOS:
+                                       if (receivedRowCount.get() > 0) {
+                                               
terminal.writer().println(borderline);
+                                       }
+                                       terminal.writer().println("Received a 
total of " + receivedRowCount.get() + " rows");
+                                       terminal.flush();
+                                       return;
+                               default:
+                                       List<Tuple2<Boolean, Row>> changes = 
result.getPayload();
+                                       for (Tuple2<Boolean, Row> change : 
changes) {
+                                               final String[] cols = 
rowToString(change.f1);
+                                               String[] row = new 
String[cols.length + 1];
+                                               row[0] = change.f0 ? "+" : "-";
+                                               System.arraycopy(cols, 0, row, 
1, cols.length);
+                                               printSingleRow(colWidths, row);
+                                               receivedRowCount.addAndGet(1);
+                                       }
+                       }
+               }
+       }
+
+       private void printBatchResults(List<Row> resultRows) {
+               List<String[]> rows = new ArrayList<>(resultRows.size() + 1);
+
+               // fill field names first
+               List<TableColumn> columns = 
resultDescriptor.getResultSchema().getTableColumns();
+               
rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
+               resultRows.forEach(row -> rows.add(rowToString(row)));
+
+               int[] colWidths = columnWidthsByContent(columns, rows);
+               String borderline = genBorderLine(colWidths);
+
+               // print field names
+               terminal.writer().println(borderline);
+               printSingleRow(colWidths, rows.get(0));
+               terminal.writer().println(borderline);
+
+               // print content
+               rows.subList(1, rows.size()).forEach(row -> 
printSingleRow(colWidths, row));
+               if (!resultRows.isEmpty()) {
+                       terminal.writer().println(borderline);
+               }
+
+               // print footer
+               terminal.writer().println(resultRows.size() + " row in set");
+               terminal.flush();
+       }
+
+       private String genBorderLine(int[] colWidths) {
+               StringBuilder sb = new StringBuilder();
+               sb.append("+");
+               for (int width : colWidths) {
+                       sb.append(StringUtils.repeat("-", width + 1));
+                       sb.append("-+");
+               }
+               return sb.toString();
+       }
+
+       private void printSingleRow(int[] colWidths, String[] cols) {
+               StringBuilder sb = new StringBuilder();
+               sb.append("|");
+               int idx = 0;
+               for (String col : cols) {
+                       sb.append(" ");
+                       if (col.length() <= colWidths[idx]) {
+                               sb.append(StringUtils.repeat(" ", 
colWidths[idx] - col.length()));
+                               sb.append(col);
+                       } else {
+                               sb.append(col, 0, colWidths[idx] - 
COLUMN_TRUNCATED_FLAG.length());
+                               sb.append(COLUMN_TRUNCATED_FLAG);
+                       }
+                       sb.append(" |");
+                       idx++;
+               }
+               terminal.writer().println(sb.toString());
+               terminal.flush();
+       }
+
+       /**
+        * Try to infer column width based on column types. In streaming case, 
we will have an
+        * endless result set, thus couldn't determine column widths based on 
column values.
+        */
+       private int[] columnWidthsByType(List<TableColumn> columns, boolean 
includeChangeflag) {
+               // fill width with field names first
+               int[] colWidths  = columns.stream()
+                               .mapToInt(col -> col.getName().length())
+                               .toArray();
+
+               // determine proper column width based on types
+               for (int i = 0; i < columns.size(); ++i) {
+                       LogicalType type = 
columns.get(i).getType().getLogicalType();
+                       int len;
+                       switch (type.getTypeRoot()) {
+                               case TINYINT:
+                                       len = TinyIntType.PRECISION + 1; // 
extra for negative value
+                                       break;
+                               case SMALLINT:
+                                       len = SmallIntType.PRECISION + 1; // 
extra for negative value
+                                       break;
+                               case INTEGER:
+                                       len = IntType.PRECISION + 1; // extra 
for negative value
+                                       break;
+                               case BIGINT:
+                                       len = BigIntType.PRECISION + 1; // 
extra for negative value
+                                       break;
+                               case DECIMAL:
+                                       len = ((DecimalType) 
type).getPrecision() + 1; // extra for negative value
 
 Review comment:
   2? There is a decimal point.
   like `-123.567`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to