Daniel Verite wrote: > > * In the "if (cont.cells[idx] != NULL && cont.cells[idx][0] != '\0')" > > block (line 497 in the attached), can't we do the same thing by using > > psprintf? > > In that block, we can't pass a cell contents as a valist and be done with > that cell, because duplicates of (col value,row value) may happen > at any iteration of the upper loop over PQntuples(results). Any cell really > may need reallocation unpredictably until that loop is done, whereas > psprintf starts by allocating a new buffer unconditionally, so it doesn't > look > to me like it could help to simplify that block.
I messed with that code some more, as it looked unnecessarily complicated; please see attached and verify that it still behaves sanely. This needs those regression tests you promised. I tested a few cases and it seems good to me. -- Álvaro Herrera http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index d8b9a03..9c5a915 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -990,6 +990,113 @@ testdb=> </varlistentry> <varlistentry> + <term><literal>\crosstabview [ + <replaceable class="parameter">colV</replaceable> + <replaceable class="parameter">colH</replaceable> + [:<replaceable class="parameter">scolH</replaceable>] + [<replaceable class="parameter">colG1[,colG2...]</replaceable>] + ] </literal></term> + <listitem> + <para> + Execute the current query buffer (like <literal>\g</literal>) and shows + the results inside a crosstab grid. + The output column <replaceable class="parameter">colV</replaceable> + becomes a vertical header + and the output column <replaceable class="parameter">colH</replaceable> + becomes a horizontal header, optionally sorted by ranking data obtained + from <replaceable class="parameter">scolH</replaceable>. + + <replaceable class="parameter">colG1[,colG2...]</replaceable> + is the list of output columns to project into the grid. + By default, all output columns of the query except + <replaceable class="parameter">colV</replaceable> and + <replaceable class="parameter">colH</replaceable> + are included in this list. + </para> + + <para> + All columns can be refered to by their position (starting at 1), or by + their name. Normal case folding and quoting rules apply on column + names. By default, + <replaceable class="parameter">colV</replaceable> corresponds to column 1 + and <replaceable class="parameter">colH</replaceable> to column 2. + A query having only one output column cannot be viewed in crosstab, and + <replaceable class="parameter">colH</replaceable> must differ from + <replaceable class="parameter">colV</replaceable>. + </para> + + <para> + The vertical header, displayed as the leftmost column, + contains the deduplicated values found in + column <replaceable class="parameter">colV</replaceable>, in the same + order as in the query results. + </para> + <para> + The horizontal header, displayed as the first row, + contains the deduplicated values found in + column <replaceable class="parameter">colH</replaceable>, in + the order of appearance in the query results. + If specified, the optional <replaceable class="parameter">scolH</replaceable> + argument refers to a column whose values should be integer numbers + by which <replaceable class="parameter">colH</replaceable> will be sorted + to be positioned in the horizontal header. + </para> + + <para> + Inside the crosstab grid, + given a query output with <literal>N</literal> columns + (including <replaceable class="parameter">colV</replaceable> and + <replaceable class="parameter">colH</replaceable>), + for each distinct value <literal>x</literal> of + <replaceable class="parameter">colH</replaceable> + and each distinct value <literal>y</literal> of + <replaceable class="parameter">colV</replaceable>, + the contents of a cell located at the intersection + <literal>(x,y)</literal> is determined by these rules: + <itemizedlist> + <listitem> + <para> + if there is no corresponding row in the query results such that the + value for <replaceable class="parameter">colH</replaceable> + is <literal>x</literal> and the value + for <replaceable class="parameter">colV</replaceable> + is <literal>y</literal>, the cell is empty. + </para> + </listitem> + + <listitem> + <para> + if there is exactly one row such that the value + for <replaceable class="parameter">colH</replaceable> + is <literal>x</literal> and the value + for <replaceable class="parameter">colV</replaceable> + is <literal>y</literal>, then the <literal>N-2</literal> other + columns or the columns listed in + <replaceable class="parameter">colG1[,colG2...]</replaceable> + are displayed in the cell, separated between each other by + a space character if needed. + + If <literal>N=2</literal>, the letter <literal>X</literal> is displayed + in the cell as if a virtual third column contained that character. + </para> + </listitem> + + <listitem> + <para> + if there are several corresponding rows, the behavior is identical to + the case of one row except that the values coming from different rows + are stacked vertically, the different source rows being separated by + newline characters inside the cell. + </para> + </listitem> + + </itemizedlist> + </para> + + </listitem> + </varlistentry> + + <varlistentry> <term><literal>\d[S+] [ <link linkend="APP-PSQL-patterns"><replaceable class="parameter">pattern</replaceable></link> ]</literal></term> <listitem> @@ -4066,6 +4173,47 @@ first | 4 second | four </programlisting></para> +<para> + When suitable, query results can be shown in a crosstab representation + with the \crosstabview command: +<programlisting> +testdb=> <userinput>SELECT first, second, first > 2 AS gt2 FROM my_table;</userinput> + first | second | ge2 +-------+--------+----- + 1 | one | f + 2 | two | f + 3 | three | t + 4 | four | t +(4 rows) + +testdb=> <userinput>\crosstabview first second</userinput> + first | one | two | three | four +-------+-----+-----+-------+------ + 1 | f | | | + 2 | | f | | + 3 | | | t | + 4 | | | | t +(4 rows) +</programlisting> + +This second example shows a multiplication table with rows sorted in reverse +numerical order and columns with an independant, ascending numerical order. +<programlisting> +testdb=> <userinput>SELECT t1.first as "A", t2.first+100 AS "B", t1.first*(t2.first+100) as "AxB",</userinput> +testdb(> <userinput>row_number() over(order by t2.first) AS ord</userinput> +testdb(> <userinput>FROM my_table t1 CROSS JOIN my_table t2 ORDER BY 1 DESC</userinput> +testdb(> <userinput>\crosstabview A B:ord AxB</userinput> + A | 101 | 102 | 103 | 104 +---+-----+-----+-----+----- + 4 | 404 | 408 | 412 | 416 + 3 | 303 | 306 | 309 | 312 + 2 | 202 | 204 | 206 | 208 + 1 | 101 | 102 | 103 | 104 +(4 rows) +</programlisting> + +</para> + </refsect1> </refentry> diff --git a/src/bin/psql/Makefile b/src/bin/psql/Makefile index d1c3b77..1f6a289 100644 --- a/src/bin/psql/Makefile +++ b/src/bin/psql/Makefile @@ -23,7 +23,7 @@ LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq OBJS= command.o common.o help.o input.o stringutils.o mainloop.o copy.o \ startup.o prompt.o variables.o large_obj.o describe.o \ - tab-complete.o \ + crosstabview.o tab-complete.o \ sql_help.o psqlscanslash.o \ $(WIN32RES) diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index 1d326a8..aef6f23 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -39,6 +39,7 @@ #include "common.h" #include "copy.h" +#include "crosstabview.h" #include "describe.h" #include "help.h" #include "input.h" @@ -364,6 +365,20 @@ exec_command(const char *cmd, else if (strcmp(cmd, "copyright") == 0) print_copyright(); + /* \crosstabview -- execute a query and display results in crosstab */ + else if (strcmp(cmd, "crosstabview") == 0) + { + pset.crosstabview_col_V = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + pset.crosstabview_col_H = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + pset.crosstabview_cols_grid = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + + pset.crosstabview_output = true; + status = PSQL_CMD_SEND; + } + /* \d* commands */ else if (cmd[0] == 'd') { diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index df3441c..5840331 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -23,6 +23,7 @@ #include "settings.h" #include "command.h" #include "copy.h" +#include "crosstabview.h" #include "fe_utils/mbprint.h" @@ -1064,6 +1065,8 @@ PrintQueryResults(PGresult *results) success = StoreQueryTuple(results); else if (pset.gexec_flag) success = ExecQueryTuples(results); + else if (pset.crosstabview_output) + success = PrintResultsInCrossTab(results); else success = PrintQueryTuples(results); /* if it's INSERT/UPDATE/DELETE RETURNING, also print status */ @@ -1356,6 +1359,24 @@ sendquery_cleanup: /* reset \gexec trigger */ pset.gexec_flag = false; + /* reset \crosstabview trigger */ + pset.crosstabview_output = false; + if (pset.crosstabview_col_V) + { + free(pset.crosstabview_col_V); + pset.crosstabview_col_V = NULL; + } + if (pset.crosstabview_col_H) + { + free(pset.crosstabview_col_H); + pset.crosstabview_col_H = NULL; + } + if (pset.crosstabview_cols_grid) + { + free(pset.crosstabview_cols_grid); + pset.crosstabview_cols_grid = NULL; + } + return OK; } @@ -1520,7 +1541,25 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) is_pager = true; } - printQuery(results, &my_popt, fout, is_pager, pset.logfile); + if (pset.crosstabview_output) + { + if (ntuples < fetch_count) + PrintResultsInCrossTab(results); + else + { + /* + * crosstabview is denied if the whole set of rows is not + * guaranteed to be fetched in the first iteration, because + * it's expected in memory as a single PGresult structure. + */ + psql_error("\\crosstabview must be used with less than FETCH_COUNT (%d) rows\n", + fetch_count); + PQclear(results); + break; + } + } + else + printQuery(results, &my_popt, fout, is_pager, pset.logfile); ClearOrSaveResult(results); @@ -1599,6 +1638,23 @@ cleanup: *elapsed_msec += INSTR_TIME_GET_MILLISEC(after); } + /* reset \crosstabview settings */ + pset.crosstabview_output = false; + if (pset.crosstabview_col_V) + { + free(pset.crosstabview_col_V); + pset.crosstabview_col_V = NULL; + } + if (pset.crosstabview_col_H) + { + free(pset.crosstabview_col_H); + pset.crosstabview_col_H = NULL; + } + if (pset.crosstabview_cols_grid) + { + free(pset.crosstabview_cols_grid); + pset.crosstabview_cols_grid = NULL; + } return OK; } diff --git a/src/bin/psql/crosstabview.c b/src/bin/psql/crosstabview.c new file mode 100644 index 0000000..0d70e47 --- /dev/null +++ b/src/bin/psql/crosstabview.c @@ -0,0 +1,943 @@ +/* + * psql - the PostgreSQL interactive terminal + * + * Copyright (c) 2000-2016, PostgreSQL Global Development Group + * + * src/bin/psql/crosstabview.c + */ +#include "postgres_fe.h" + +#include <string.h> + +#include "common.h" +#include "crosstabview.h" +#include "pqexpbuffer.h" +#include "settings.h" + + +/* + * Value/position from the resultset that goes into the horizontal or vertical + * crosstabview header. + */ +typedef struct _pivot_field +{ + /* + * Pointer obtained from PQgetvalue() for colV or colH. Each distinct + * value becomes an entry in the vertical header (colV), or horizontal + * header (colH). A Null value is represented by a NULL pointer. + */ + char *name; + + /* + * When a sort is requested on an alternative column, this holds + * PQgetvalue() for the sort column corresponding to <name>. If <name> + * appear multiple times, it's the first value in the order of the results + * that is kept. A Null value is represented by a NULL pointer. + */ + char *sort_value; + + /* + * Rank of this value, starting at 0. Initially, it's the relative + * position of the first appearance of <name> in the resultset. For + * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3 + * When a sort column is specified, ranks get updated in a final pass to + * reflect the desired order. + */ + int rank; +} pivot_field; + +/* Node in avl_tree */ +typedef struct _avl_node +{ + /* Node contents */ + pivot_field field; + + /* + * Height of this node in the tree (number of nodes on the longest path to + * a leaf). + */ + int height; + + /* + * Child nodes. [0] points to left subtree, [1] to right subtree. Never + * NULL, points to the empty node avl_tree.end when no left or right + * value. + */ + struct _avl_node *childs[2]; +} avl_node; + +/* + * Control structure for the AVL tree (binary search tree kept + * balanced with the AVL algorithm) + */ +typedef struct _avl_tree +{ + int count; /* Total number of nodes */ + avl_node *root; /* root of the tree */ + avl_node *end; /* Immutable dereferenceable empty tree */ +} avl_tree; + + +static void printCrosstab(const PGresult *results, + int num_columns, pivot_field *piv_columns, int field_for_columns, + int num_rows, pivot_field *piv_rows, int field_for_rows, + int num_colsG, int *colsG); +static int parseColumnRefs(char *arg, PGresult *res, int **col_numbers, + int max_columns, char separator); +static void avlInit(avl_tree *tree); +static void avlMergeValue(avl_tree *tree, char *name, char *sort_value); +static int avlCollectFields(avl_tree *tree, avl_node *node, + pivot_field *fields, int idx); +static void avlFree(avl_tree *tree, avl_node *node); +static void rankSort(int num_columns, pivot_field *piv_columns); +static int indexOfColumn(const char *arg, PGresult *res); +static int pivotFieldCompare(const void *a, const void *b); +static int rankCompare(const void *a, const void *b); + + +/* + * Main entry point to this module. + * + * Process the data from *res according the display options in pset (global), + * to generate the horizontal and vertical headers contents, + * then call printCrosstab() for the actual output. + */ +bool +PrintResultsInCrossTab(PGresult *res) +{ + /* COLV or null */ + char *opt_field_for_rows = pset.crosstabview_col_V; + + /* COLH[:SCOLH] or null */ + char *opt_field_for_columns = pset.crosstabview_col_H; + int rn; + avl_tree piv_columns; + avl_tree piv_rows; + pivot_field *array_columns = NULL; + pivot_field *array_rows = NULL; + int num_columns = 0; + int num_rows = 0; + bool retval = false; + + /* + * column definitions involved in the vertical header, horizontal header, + * and grid + */ + int *colsV = NULL, + *colsH = NULL, + *colsG = NULL; + int num_colsG; + int nn; + + /* + * 0-based index of the field whose distinct values will become COLUMN + * headers + */ + int field_for_columns = -1; + int sort_field_for_columns = -1; + + /* + * 0-based index of the field whose distinct values will become ROW + * headers + */ + int field_for_rows = -1; + + avlInit(&piv_rows); + avlInit(&piv_columns); + + if (res == NULL) + { + psql_error(_("No result\n")); + goto error_return; + } + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + psql_error(_("The query must return results to be shown in crosstab\n")); + goto error_return; + } + + if (opt_field_for_rows && !opt_field_for_columns) + { + psql_error(_("A second column must be specified for the horizontal header\n")); + goto error_return; + } + + if (PQnfields(res) < 2) + { + psql_error(_("The query must return at least two columns to be shown in crosstab\n")); + goto error_return; + } + + /* + * Arguments processing for the vertical header (1st arg) displayed in the + * left-most column. Only a reference to a field is accepted (no sort + * column). + */ + + if (opt_field_for_rows == NULL) + { + field_for_rows = 0; + } + else + { + nn = parseColumnRefs(opt_field_for_rows, res, &colsV, 1, ':'); + if (nn != 1) + goto error_return; + field_for_rows = colsV[0]; + } + + if (field_for_rows < 0) + goto error_return; + + /*---------- + * Arguments processing for the horizontal header (2nd arg) + * (pivoted column that gets displayed as the first row). + * Determine: + * - the sort direction if any + * - the field number of that column in the PGresult + * - the field number of the associated sort column if any + */ + + if (opt_field_for_columns == NULL) + field_for_columns = 1; + else + { + nn = parseColumnRefs(opt_field_for_columns, res, &colsH, 2, ':'); + if (nn <= 0) + goto error_return; + if (nn == 1) + field_for_columns = colsH[0]; + else + { + field_for_columns = colsH[0]; + sort_field_for_columns = colsH[1]; + } + + if (field_for_columns < 0) + goto error_return; + } + + if (field_for_columns == field_for_rows) + { + psql_error(_("The same column cannot be used for both vertical and horizontal headers\n")); + goto error_return; + } + + /* + * Arguments processing for the columns aside from headers (3rd arg) + * Determine the columns to display in the grid and their order. + */ + if (pset.crosstabview_cols_grid == NULL) + { + /* + * By defaut, all the fields from PGresult get displayed into the + * grid, except the two fields that go into the vertical and + * horizontal headers. + */ + if (PQnfields(res) > 2) + { + int i, + j = 0; + + colsG = (int *) pg_malloc(sizeof(int) * (PQnfields(res) - 2)); + for (i = 0; i < PQnfields(res); i++) + { + if (i != field_for_rows && i != field_for_columns) + colsG[j++] = i; + } + num_colsG = PQnfields(res) - 2; + } + else + { + colsG = NULL; + num_colsG = 0; + } + } + else + { + /* + * Non-default case: a list of fields is given. Parse that list to + * determine the fields to display into the grid, and in what order. + * The list format is colA[,colB[,colC...]] + */ + num_colsG = parseColumnRefs(pset.crosstabview_cols_grid, + res, &colsG, PQnfields(res), ','); + if (num_colsG <= 0) + goto error_return; + } + + /* + * First part: accumulate the names that go into the vertical and + * horizontal headers, each into an AVL binary tree to build the set of + * DISTINCT values. + */ + + for (rn = 0; rn < PQntuples(res); rn++) + { + /* horizontal */ + char *val; + char *val1; + + val = PQgetisnull(res, rn, field_for_columns) ? NULL : + PQgetvalue(res, rn, field_for_columns); + val1 = NULL; + + if (sort_field_for_columns >= 0 && + !PQgetisnull(res, rn, sort_field_for_columns)) + val1 = PQgetvalue(res, rn, sort_field_for_columns); + + avlMergeValue(&piv_columns, val, val1); + + if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS) + { + psql_error(_("Maximum number of columns (%d) exceeded\n"), + CROSSTABVIEW_MAX_COLUMNS); + goto error_return; + } + + /* vertical */ + val = PQgetisnull(res, rn, field_for_rows) ? NULL : + PQgetvalue(res, rn, field_for_rows); + + avlMergeValue(&piv_rows, val, NULL); + } + + /* + * Second part: Generate sorted arrays from the AVL trees. + */ + + num_columns = piv_columns.count; + num_rows = piv_rows.count; + + array_columns = (pivot_field *) + pg_malloc(sizeof(pivot_field) * num_columns); + + array_rows = (pivot_field *) + pg_malloc(sizeof(pivot_field) * num_rows); + + avlCollectFields(&piv_columns, piv_columns.root, array_columns, 0); + avlCollectFields(&piv_rows, piv_rows.root, array_rows, 0); + + /* + * Third part: optionally, process the ranking data for the horizontal + * header + */ + if (sort_field_for_columns >= 0) + rankSort(num_columns, array_columns); + + /* + * Fourth part: print the crosstab'ed results. + */ + printCrosstab(res, + num_columns, array_columns, field_for_columns, + num_rows, array_rows, field_for_rows, + num_colsG, colsG); + + retval = true; + +error_return: + avlFree(&piv_columns, piv_columns.root); + avlFree(&piv_rows, piv_rows.root); + pg_free(array_columns); + pg_free(array_rows); + pg_free(colsV); + pg_free(colsH); + pg_free(colsG); + + return retval; +} + +/* + * Output the pivoted resultset with the printTable* functions + */ +static void +printCrosstab(const PGresult *results, + int num_columns, pivot_field *piv_columns, int field_for_columns, + int num_rows, pivot_field *piv_rows, int field_for_rows, + int num_colsG, int *colsG) +{ + printQueryOpt popt = pset.popt; + printTableContent cont; + int i, + j, + rn; + char col_align; + int *horiz_map; + char **allocated_cells; + + printTableInit(&cont, &popt.topt, popt.title, num_columns + 1, num_rows); + + /* Step 1: set target column names (horizontal header) */ + + /* The name of the first column is kept unchanged by the pivoting */ + printTableAddHeader(&cont, + PQfname(results, field_for_rows), + false, + column_type_alignment(PQftype(results, + field_for_rows))); + + /* + * To iterate over piv_columns[] by piv_columns[].rank, create a reverse + * map associating each piv_columns[].rank to its index in piv_columns. + * This avoids an O(N^2) loop later. + */ + horiz_map = (int *) pg_malloc(sizeof(int) * num_columns); + for (i = 0; i < num_columns; i++) + horiz_map[piv_columns[i].rank] = i; + + /* + * In the common case of only one field projected into the cells, the + * display alignment depends on its PQftype(). Otherwise the contents are + * made-up strings, so use left alignment. + */ + col_align = num_colsG == 1 ? + column_type_alignment(PQftype(results, colsG[0])) : 'l'; + + for (i = 0; i < num_columns; i++) + { + char *colname; + + colname = piv_columns[horiz_map[i]].name ? + piv_columns[horiz_map[i]].name : + (popt.nullPrint ? popt.nullPrint : ""); + + printTableAddHeader(&cont, colname, false, col_align); + } + pg_free(horiz_map); + + /* Step 2: set row names in the first output column (vertical header) */ + for (i = 0; i < num_rows; i++) + { + int k = piv_rows[i].rank; + + cont.cells[k * (num_columns + 1)] = piv_rows[i].name ? + piv_rows[i].name : + (popt.nullPrint ? popt.nullPrint : ""); + /* Initialize all cells inside the grid to an empty value */ + for (j = 0; j < num_columns; j++) + cont.cells[k * (num_columns + 1) + j + 1] = ""; + } + cont.cellsadded = num_rows * (num_columns + 1); + + /* + * Step 3: fill in the content cells. + * + * By the time this loop is done, each of the cells in cont.cells is either + * a pointer into the PGresult which must not be freed (this happens if + * there's a single value for that cell), or an allocated string where the + * multiple values have been concatenated together. In the latter case, + * allocated_cells also contains the pointer, so that it can be freed after + * we're done. + */ + allocated_cells = (char **) + pg_malloc0((num_rows + 1) * (num_columns + 1) * sizeof(char *)); + for (rn = 0; rn < PQntuples(results); rn++) + { + int row_number; + int col_number; + pivot_field *p; + pivot_field elt; + + /* Find target row */ + if (!PQgetisnull(results, rn, field_for_rows)) + elt.name = PQgetvalue(results, rn, field_for_rows); + else + elt.name = NULL; + p = (pivot_field *) bsearch(&elt, + piv_rows, + num_rows, + sizeof(pivot_field), + pivotFieldCompare); + + row_number = p ? p->rank : -1; + + /* Find target column */ + if (!PQgetisnull(results, rn, field_for_columns)) + elt.name = PQgetvalue(results, rn, field_for_columns); + else + elt.name = NULL; + + p = (pivot_field *) bsearch(&elt, + piv_columns, + num_columns, + sizeof(pivot_field), + pivotFieldCompare); + col_number = p ? p->rank : -1; + + /* Place value into cell */ + if (col_number >= 0 && row_number >= 0) + { + int idx; + + /* index into the cont.cells and allocated_cells arrays */ + idx = 1 + col_number + row_number * (num_columns + 1); + + /* + * special case: when the source has only 2 columns, use a X + * (cross/checkmark) for the cell content. + */ + if (PQnfields(results) == 2) + { + cont.cells[idx] = "X"; + } + else + { + for (i = 0; i < num_colsG; i++) + { + char *content; + + content = !PQgetisnull(results, rn, colsG[i]) ? + PQgetvalue(results, rn, colsG[i]) : + (popt.nullPrint ? popt.nullPrint : ""); + + /* + * If the cell already contains a value, concatenate the new + * contents together with the previous value now. + */ + if (cont.cells[idx] != NULL) + { + char *new_content; + + /* + * Form the new contents by concatenating the value of the + * current cell with the preexisting contents. Separate + * multiple columns in the same row with a space; for the + * first column of each row, separate with a newline + * instead. + */ + if (allocated_cells[idx] != NULL) + new_content = psprintf("%s%s%s", + allocated_cells[idx], + i == 0 ? "\n" : " ", + content); + else + new_content = psprintf("%s", content); + + cont.cells[idx] = new_content; + if (allocated_cells[idx] != NULL) + pg_free(allocated_cells[idx]); + allocated_cells[idx] = new_content; + } + else + { + cont.cells[idx] = content; + } + } + } + } + } + + printTable(&cont, pset.queryFout, false, pset.logfile); + printTableCleanup(&cont); + + for (i = 0; i < num_rows * num_columns; i++) + { + if (allocated_cells[i] != NULL) + pg_free(allocated_cells[i]); + } + + pg_free(allocated_cells); +} + +/* + * Parse col1[<sep>col2][<sep>col3]... + * where colN can be: + * - a number from 1 to PQnfields(res) + * - an unquoted column name matching (case insensitively) one of PQfname(res,...) + * - a quoted column name matching (case sensitively) one of PQfname(res,...) + * max_columns: 0 if no maximum + */ +static int +parseColumnRefs(char *arg, + PGresult *res, + int **col_numbers, + int max_columns, + char separator) +{ + char *p = arg; + char c; + int col_num = -1; + int nb_cols = 0; + char *field_start = NULL; + + *col_numbers = NULL; + while ((c = *p) != '\0') + { + bool quoted_field = false; + + field_start = p; + + /* first char */ + if (c == '"') + { + quoted_field = true; + p++; + } + + while ((c = *p) != '\0') + { + if (c == separator && !quoted_field) + break; + if (c == '"') /* end of field or embedded double quote */ + { + p++; + if (*p == '"') + { + if (quoted_field) + { + p++; + continue; + } + } + else if (quoted_field && *p == separator) + break; + } + if (*p) + p += PQmblen(p, pset.encoding); + } + + if (p != field_start) + { + /* look up the column and add its index into *col_numbers */ + if (max_columns != 0 && nb_cols == max_columns) + { + psql_error(_("No more than %d column references expected\n"), max_columns); + goto errfail; + } + c = *p; + *p = '\0'; + col_num = indexOfColumn(field_start, res); + *p = c; + if (col_num < 0) + goto errfail; + *col_numbers = (int *) pg_realloc(*col_numbers, (1 + nb_cols) * sizeof(int)); + (*col_numbers)[nb_cols++] = col_num; + } + else + { + psql_error(_("Empty column reference\n")); + goto errfail; + } + + if (*p) + p += PQmblen(p, pset.encoding); + } + return nb_cols; + +errfail: + pg_free(*col_numbers); + *col_numbers = NULL; + return -1; +} + +/* + * The avl* functions below provide a minimalistic implementation of AVL binary + * trees, to efficiently collect the distinct values that will form the horizontal + * and vertical headers. It only supports adding new values, no removal or even + * search. + */ +static void +avlInit(avl_tree *tree) +{ + tree->end = (avl_node *) pg_malloc0(sizeof(avl_node)); + tree->end->childs[0] = tree->end->childs[1] = tree->end; + tree->count = 0; + tree->root = tree->end; +} + +/* Deallocate recursively an AVL tree, starting from node */ +static void +avlFree(avl_tree *tree, avl_node *node) +{ + if (node->childs[0] != tree->end) + { + avlFree(tree, node->childs[0]); + pg_free(node->childs[0]); + } + if (node->childs[1] != tree->end) + { + avlFree(tree, node->childs[1]); + pg_free(node->childs[1]); + } + if (node == tree->root) + { + /* free the root separately as it's not child of anything */ + if (node != tree->end) + pg_free(node); + /* free the tree->end struct only once and when all else is freed */ + pg_free(tree->end); + } +} + +/* Set the height to 1 plus the greatest of left and right heights */ +static void +avlUpdateHeight(avl_node *n) +{ + n->height = 1 + (n->childs[0]->height > n->childs[1]->height ? + n->childs[0]->height : + n->childs[1]->height); +} + +/* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */ +static avl_node * +avlRotate(avl_node **current, int dir) +{ + avl_node *before = *current; + avl_node *after = (*current)->childs[dir]; + + *current = after; + before->childs[dir] = after->childs[!dir]; + avlUpdateHeight(before); + after->childs[!dir] = before; + + return after; +} + +static int +avlBalance(avl_node *n) +{ + return n->childs[0]->height - n->childs[1]->height; +} + +/* + * After an insertion, possibly rebalance the tree so that the left and right + * node heights don't differ by more than 1. + * May update *node. + */ +static void +avlAdjustBalance(avl_tree *tree, avl_node **node) +{ + avl_node *current = *node; + int b = avlBalance(current) / 2; + + if (b != 0) + { + int dir = (1 - b) / 2; + + if (avlBalance(current->childs[dir]) == -b) + avlRotate(¤t->childs[dir], !dir); + current = avlRotate(node, dir); + } + if (current != tree->end) + avlUpdateHeight(current); +} + +/* + * Insert a new value/field, starting from *node, reaching the correct position + * in the tree by recursion. Possibly rebalance the tree and possibly update + * *node. Do nothing if the value is already present in the tree. + */ +static void +avlInsertNode(avl_tree *tree, avl_node **node, pivot_field field) +{ + avl_node *current = *node; + + if (current == tree->end) + { + avl_node *new_node = (avl_node *) + pg_malloc(sizeof(avl_node)); + + new_node->height = 1; + new_node->field = field; + new_node->childs[0] = new_node->childs[1] = tree->end; + tree->count++; + *node = new_node; + } + else + { + int cmp = pivotFieldCompare(&field, ¤t->field); + + if (cmp != 0) + { + avlInsertNode(tree, + cmp > 0 ? ¤t->childs[1] : ¤t->childs[0], + field); + avlAdjustBalance(tree, node); + } + } +} + +/* Insert the value into the AVL tree, if it does not preexist */ +static void +avlMergeValue(avl_tree *tree, char *name, char *sort_value) +{ + pivot_field field; + + field.name = name; + field.rank = tree->count; + field.sort_value = sort_value; + avlInsertNode(tree, &tree->root, field); +} + +/* + * Recursively extract node values into the names array, in sorted order with a + * left-to-right tree traversal. + * Return the next candidate offset to write into the names array. + * fields[] must be preallocated to hold tree->count entries + */ +static int +avlCollectFields(avl_tree *tree, avl_node *node, pivot_field *fields, int idx) +{ + if (node == tree->end) + return idx; + + idx = avlCollectFields(tree, node->childs[0], fields, idx); + fields[idx] = node->field; + return avlCollectFields(tree, node->childs[1], fields, idx + 1); +} + +static void +rankSort(int num_columns, pivot_field *piv_columns) +{ + int *hmap; /* [[offset in piv_columns, rank], ...for + * every header entry] */ + int i; + + hmap = (int *) pg_malloc(sizeof(int) * num_columns * 2); + for (i = 0; i < num_columns; i++) + { + char *val = piv_columns[i].sort_value; + + /* ranking information is valid if non null and matches /^-?\d+$/ */ + if (val && + ((*val == '-' && + strspn(val + 1, "0123456789") == strlen(val + 1)) || + strspn(val, "0123456789") == strlen(val))) + { + hmap[i * 2] = atoi(val); + hmap[i * 2 + 1] = i; + } + else + { + /* invalid rank information ignored (equivalent to rank 0) */ + hmap[i * 2] = 0; + hmap[i * 2 + 1] = i; + } + } + + qsort(hmap, num_columns, sizeof(int) * 2, rankCompare); + + for (i = 0; i < num_columns; i++) + { + piv_columns[hmap[i * 2 + 1]].rank = i; + } + + pg_free(hmap); +} + +/* + * Compare a user-supplied argument against a field name obtained by PQfname(), + * which is already case-folded. + * If arg is not enclosed in double quotes, pg_strcasecmp applies, otherwise + * do a case-sensitive comparison with these rules: + * - double quotes enclosing 'arg' are filtered out + * - double quotes inside 'arg' are expected to be doubled + */ +static bool +fieldNameEquals(const char *arg, const char *fieldname) +{ + const char *p = arg; + const char *f = fieldname; + char c; + + if (*p++ != '"') + return !pg_strcasecmp(arg, fieldname); + + while ((c = *p++)) + { + if (c == '"') + { + if (*p == '"') + p++; /* skip second quote and continue */ + else if (*p == '\0') + return (*f == '\0'); /* p is shorter than f, or is + * identical */ + } + if (*f == '\0') + return false; /* f is shorter than p */ + if (c != *f) /* found one byte that differs */ + return false; + f++; + } + return (*f == '\0'); +} + +/* + * arg can be a number or a column name, possibly quoted (like in an ORDER BY clause) + * Returns: + * on success, the 0-based index of the column + * or -1 if the column number or name is not found in the result's structure, + * or if it's ambiguous (arg corresponding to several columns) + */ +static int +indexOfColumn(const char *arg, PGresult *res) +{ + int idx; + + if (strspn(arg, "0123456789") == strlen(arg)) + { + /* if arg contains only digits, it's a column number */ + idx = atoi(arg) - 1; + if (idx < 0 || idx >= PQnfields(res)) + { + psql_error(_("Invalid column number: %s\n"), arg); + return -1; + } + } + else + { + int i; + + idx = -1; + for (i = 0; i < PQnfields(res); i++) + { + if (fieldNameEquals(arg, PQfname(res, i))) + { + if (idx >= 0) + { + /* if another idx was already found for the same name */ + psql_error(_("Ambiguous column name: %s\n"), arg); + return -1; + } + idx = i; + } + } + if (idx == -1) + { + psql_error(_("Invalid column name: %s\n"), arg); + return -1; + } + } + return idx; +} + +/* + * Value comparator for vertical and horizontal headers + * used for deduplication only. + * - null values are considered equal + * - non-null < null + * - non-null values are compared with strcmp() + */ +static int +pivotFieldCompare(const void *a, const void *b) +{ + pivot_field *pa = (pivot_field *) a; + pivot_field *pb = (pivot_field *) b; + + /* test null values */ + if (!pb->name) + return pa->name ? -1 : 0; + else if (!pa->name) + return 1; + + /* non-null values */ + return strcmp(((pivot_field *) a)->name, + ((pivot_field *) b)->name); +} + +static int +rankCompare(const void *a, const void *b) +{ + return *((int *) a) - *((int *) b); +} diff --git a/src/bin/psql/crosstabview.h b/src/bin/psql/crosstabview.h new file mode 100644 index 0000000..4eb52a7 --- /dev/null +++ b/src/bin/psql/crosstabview.h @@ -0,0 +1,26 @@ +/* + * psql - the PostgreSQL interactive terminal + * + * Copyright (c) 2000-2016, PostgreSQL Global Development Group + * + * src/bin/psql/crosstabview.h + */ + +#ifndef CROSSTABVIEW_H +#define CROSSTABVIEW_H + +/* + * Limit the number of output columns generated in memory by the crosstabview + * algorithm. A new output column is added for each distinct value found in the + * column that pivots (to form the horizontal header). + * The purpose of this limit is to fail early instead of over-allocating or spending + * too much time if the crosstab to generate happens to be unreasonably large + * (worst case: a NxN cartesian product with N=number of tuples). + * The value of 1600 corresponds to the maximum columns per table in storage, + * but it could be as much as INT_MAX theorically. + */ +#define CROSSTABVIEW_MAX_COLUMNS 1600 + +/* prototypes */ +extern bool PrintResultsInCrossTab(PGresult *res); +#endif /* CROSSTABVIEW_H */ diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c index 7549451..96e5628 100644 --- a/src/bin/psql/help.c +++ b/src/bin/psql/help.c @@ -177,6 +177,7 @@ slashUsage(unsigned short int pager) fprintf(output, _(" \\gexec execute query, then execute each value in its result\n")); fprintf(output, _(" \\gset [PREFIX] execute query and store results in psql variables\n")); fprintf(output, _(" \\q quit psql\n")); + fprintf(output, _(" \\crosstabview [COLUMNS] execute query and display results in crosstab\n")); fprintf(output, _(" \\watch [SEC] execute query every SEC seconds\n")); fprintf(output, "\n"); diff --git a/src/bin/psql/settings.h b/src/bin/psql/settings.h index c69f6ba..9340ef2 100644 --- a/src/bin/psql/settings.h +++ b/src/bin/psql/settings.h @@ -93,6 +93,11 @@ typedef struct _psqlSettings char *gfname; /* one-shot file output argument for \g */ char *gset_prefix; /* one-shot prefix argument for \gset */ bool gexec_flag; /* one-shot flag to execute query's results */ + bool crosstabview_output; /* one-shot request to print results + * in crosstab */ + char *crosstabview_col_V; /* one-shot \crosstabview 1st argument */ + char *crosstabview_col_H; /* one-shot \crosstabview 2nd argument */ + char *crosstabview_cols_grid; /* one-shot \crosstabview 3nd argument */ bool notty; /* stdin or stdout is not a tty (as determined * on startup) */ diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index cb8a06d..5c10005 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1274,7 +1274,8 @@ psql_completion(const char *text, int start, int end) /* psql's backslash commands. */ static const char *const backslash_commands[] = { - "\\a", "\\connect", "\\conninfo", "\\C", "\\cd", "\\copy", "\\copyright", + "\\a", "\\connect", "\\conninfo", "\\C", "\\cd", "\\copy", + "\\copyright", "\\crosstabview", "\\d", "\\da", "\\db", "\\dc", "\\dC", "\\dd", "\\ddp", "\\dD", "\\des", "\\det", "\\deu", "\\dew", "\\dE", "\\df", "\\dF", "\\dFd", "\\dFp", "\\dFt", "\\dg", "\\di", "\\dl", "\\dL", diff --git a/src/fe_utils/print.c b/src/fe_utils/print.c index 30efd3f..1ec74f1 100644 --- a/src/fe_utils/print.c +++ b/src/fe_utils/print.c @@ -3295,30 +3295,9 @@ printQuery(const PGresult *result, const printQueryOpt *opt, for (i = 0; i < cont.ncolumns; i++) { - char align; - Oid ftype = PQftype(result, i); - - switch (ftype) - { - case INT2OID: - case INT4OID: - case INT8OID: - case FLOAT4OID: - case FLOAT8OID: - case NUMERICOID: - case OIDOID: - case XIDOID: - case CIDOID: - case CASHOID: - align = 'r'; - break; - default: - align = 'l'; - break; - } - printTableAddHeader(&cont, PQfname(result, i), - opt->translate_header, align); + opt->translate_header, + column_type_alignment(PQftype(result, i))); } /* set cells */ @@ -3360,6 +3339,31 @@ printQuery(const PGresult *result, const printQueryOpt *opt, printTableCleanup(&cont); } +char +column_type_alignment(Oid ftype) +{ + char align; + + switch (ftype) + { + case INT2OID: + case INT4OID: + case INT8OID: + case FLOAT4OID: + case FLOAT8OID: + case NUMERICOID: + case OIDOID: + case XIDOID: + case CIDOID: + case CASHOID: + align = 'r'; + break; + default: + align = 'l'; + break; + } + return align; +} void setDecimalLocale(void) diff --git a/src/include/fe_utils/print.h b/src/include/fe_utils/print.h index ff90237..18aee93 100644 --- a/src/include/fe_utils/print.h +++ b/src/include/fe_utils/print.h @@ -206,6 +206,8 @@ extern void printTable(const printTableContent *cont, extern void printQuery(const PGresult *result, const printQueryOpt *opt, FILE *fout, bool is_pager, FILE *flog); +extern char column_type_alignment(Oid); + extern void setDecimalLocale(void); extern const printTextFormat *get_line_style(const printTableOpt *opt); extern void refresh_utf8format(const printTableOpt *opt); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e293fc0..de903a0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1,3 +1,7 @@ +EditableObjectType +pivot_field +avl_tree +avl_node ABITVEC ACCESS_ALLOWED_ACE ACL
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers