Robert Haas wrote: > But worse than either of those things, there is no real > agreement on what the overall design of this feature > should be.
The part in the design that raised concerns upthread is essentially how headers sorting is exposed to the user and implemented. As suggested in [1], I've made some drastic changes in the attached patch to take the comments (from Dean R., Tom L.) into account. The idea is to limit to the bare minimum the involvement of psql in sorting: - the +/- syntax goes away - the possibility of post-sorting the values through a backdoor query goes away too, for both headers. - the vertical order of the crosstab view is now driven solely by the order in the query - the order of the horizontal header can be optionally specified by a column expected to contain an integer, with the syntax \crosstabview colv colh:scolh [other cols] which means "colh" will be sorted by "scolh". It still defaults to whatever order "colh" comes in from the results Concerning the optional "scolh", there are cases where it might pre-exist naturally, such as a month number going in pair with a month name. In other cases, a user may add it as a kind of "synthetic column" by way of a window function, for example: SELECT ...other columns..., (row_number() over(order by something [order options]) as scolh FROM... Only the relative order of scolh values is taken into account, the value itself has no meaning for crosstabview. - also NULLs are no longer excluded from headers, per Peter E. comment in [2]. [1] http://www.postgresql.org/message-id/3d513263-104b-41e3-b1c7-4ad4bd99c491@mm [2] http://www.postgresql.org/message-id/56c4e344.6070...@gmx.net Best regards, -- Daniel Vérité PostgreSQL-powered mailer: http://www.manitou-mail.org Twitter: @DanielVerite
diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index 8a85804..da0621b 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -990,6 +990,113 @@ testdb=> </varlistentry> <varlistentry> + <term><literal>\crosstabview [ + <replaceable class="parameter">colV</replaceable> + <replaceable class="parameter">colH</replaceable> + [:<replaceable class="parameter">scolH</replaceable>] + [<replaceable class="parameter">colG1[,colG2...]</replaceable>] + ] </literal></term> + <listitem> + <para> + Execute the current query buffer (like <literal>\g</literal>) and shows + the results inside a crosstab grid. + The output column <replaceable class="parameter">colV</replaceable> + becomes a vertical header + and the output column <replaceable class="parameter">colH</replaceable> + becomes a horizontal header, optionally sorted by ranking data obtained + from <replaceable class="parameter">scolH</replaceable>. + + <replaceable class="parameter">colG1[,colG2...]</replaceable> + is the list of output columns to project into the grid. + By default, all output columns of the query except + <replaceable class="parameter">colV</replaceable> and + <replaceable class="parameter">colH</replaceable> + are included in this list. + </para> + + <para> + All columns can be refered to by their position (starting at 1), or by + their name. Normal case folding and quoting rules apply on column + names. By default, + <replaceable class="parameter">colV</replaceable> corresponds to column 1 + and <replaceable class="parameter">colH</replaceable> to column 2. + A query having only one output column cannot be viewed in crosstab, and + <replaceable class="parameter">colH</replaceable> must differ from + <replaceable class="parameter">colV</replaceable>. + </para> + + <para> + The vertical header, displayed as the leftmost column, + contains the deduplicated values found in + column <replaceable class="parameter">colV</replaceable>, in the same + order as in the query results. + </para> + <para> + The horizontal header, displayed as the first row, + contains the deduplicated values found in + column <replaceable class="parameter">colH</replaceable>, in + the order of appearance in the query results. + If specified, the optional <replaceable class="parameter">scolH</replaceable> + argument refers to a column whose values should be integer numbers + by which <replaceable class="parameter">colH</replaceable> will be sorted + to be positioned in the horizontal header. + </para> + + <para> + Inside the crosstab grid, + given a query output with <literal>N</literal> columns + (including <replaceable class="parameter">colV</replaceable> and + <replaceable class="parameter">colH</replaceable>), + for each distinct value <literal>x</literal> of + <replaceable class="parameter">colH</replaceable> + and each distinct value <literal>y</literal> of + <replaceable class="parameter">colV</replaceable>, + the contents of a cell located at the intersection + <literal>(x,y)</literal> is determined by these rules: + <itemizedlist> + <listitem> + <para> + if there is no corresponding row in the query results such that the + value for <replaceable class="parameter">colH</replaceable> + is <literal>x</literal> and the value + for <replaceable class="parameter">colV</replaceable> + is <literal>y</literal>, the cell is empty. + </para> + </listitem> + + <listitem> + <para> + if there is exactly one row such that the value + for <replaceable class="parameter">colH</replaceable> + is <literal>x</literal> and the value + for <replaceable class="parameter">colV</replaceable> + is <literal>y</literal>, then the <literal>N-2</literal> other + columns or the columns listed in + <replaceable class="parameter">colG1[,colG2...]</replaceable> + are displayed in the cell, separated between each other by + a space character if needed. + + If <literal>N=2</literal>, the letter <literal>X</literal> is displayed + in the cell as if a virtual third column contained that character. + </para> + </listitem> + + <listitem> + <para> + if there are several corresponding rows, the behavior is identical to + the case of one row except that the values coming from different rows + are stacked vertically, the different source rows being separated by + newline characters inside the cell. + </para> + </listitem> + + </itemizedlist> + </para> + + </listitem> + </varlistentry> + + <varlistentry> <term><literal>\d[S+] [ <link linkend="APP-PSQL-patterns"><replaceable class="parameter">pattern</replaceable></link> ]</literal></term> <listitem> diff --git a/src/bin/psql/Makefile b/src/bin/psql/Makefile index 66e14fb..9d29fe1 100644 --- a/src/bin/psql/Makefile +++ b/src/bin/psql/Makefile @@ -23,7 +23,7 @@ override CPPFLAGS := -I. -I$(srcdir) -I$(libpq_srcdir) -I$(top_srcdir)/src/bin/p OBJS= command.o common.o help.o input.o stringutils.o mainloop.o copy.o \ startup.o prompt.o variables.o large_obj.o print.o describe.o \ tab-complete.o mbprint.o dumputils.o keywords.o kwlookup.o \ - sql_help.o \ + sql_help.o crosstabview.o \ $(WIN32RES) diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index 9750a5b..e4db76e 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -39,6 +39,7 @@ #include "common.h" #include "copy.h" +#include "crosstabview.h" #include "describe.h" #include "help.h" #include "input.h" @@ -364,6 +365,39 @@ exec_command(const char *cmd, else if (strcmp(cmd, "copyright") == 0) print_copyright(); + /* \crosstabview -- execute a query and display results in crosstab */ + else if (strcmp(cmd, "crosstabview") == 0) + { + char *opt1, + *opt2, + *opt3; + + opt1 = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + opt2 = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + opt3 = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + + if (opt1 && !opt2) + { + psql_error(_("\\%s: missing second argument\n"), cmd); + success = false; + } + else + { + pset.crosstabview_col_V = opt1 ? pg_strdup(opt1): NULL; + pset.crosstabview_col_H = opt2 ? pg_strdup(opt2): NULL; + pset.crosstabview_cols_grid = opt3 ? pg_strdup(opt3): NULL; + pset.crosstabview_output = true; + status = PSQL_CMD_SEND; + } + + free(opt1); + free(opt2); + free(opt3); + } + /* \d* commands */ else if (cmd[0] == 'd') { diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 2cb2e9b..b368883 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -24,6 +24,7 @@ #include "command.h" #include "copy.h" #include "mbprint.h" +#include "crosstabview.h" static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec); @@ -906,6 +907,8 @@ PrintQueryResults(PGresult *results) /* store or print the data ... */ if (pset.gset_prefix) success = StoreQueryTuple(results); + else if (pset.crosstabview_output) + success = PrintResultsInCrossTab(results); else success = PrintQueryTuples(results); /* if it's INSERT/UPDATE/DELETE RETURNING, also print status */ @@ -1192,6 +1195,23 @@ sendquery_cleanup: pset.gset_prefix = NULL; } + /* reset \crosstabview settings */ + pset.crosstabview_output = false; + if (pset.crosstabview_col_V) + { + free(pset.crosstabview_col_V); + pset.crosstabview_col_V = NULL; + } + if (pset.crosstabview_col_H) + { + free(pset.crosstabview_col_H); + pset.crosstabview_col_H = NULL; + } + if (pset.crosstabview_cols_grid) + { + free(pset.crosstabview_cols_grid); + pset.crosstabview_cols_grid = NULL; + } return OK; } @@ -1354,7 +1374,25 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec) is_pager = true; } - printQuery(results, &my_popt, fout, is_pager, pset.logfile); + if (pset.crosstabview_output) + { + if (ntuples < fetch_count) + PrintResultsInCrossTab(results); + else + { + /* + crosstabview is denied if the whole set of rows is not + guaranteed to be fetched in the first iteration, because + it's expected in memory as a single PGresult structure. + */ + psql_error("\\crosstabview must be used with less than FETCH_COUNT (%d) rows\n", + fetch_count); + PQclear(results); + break; + } + } + else + printQuery(results, &my_popt, fout, is_pager, pset.logfile); PQclear(results); diff --git a/src/bin/psql/crosstabview.c b/src/bin/psql/crosstabview.c new file mode 100644 index 0000000..4e21162 --- /dev/null +++ b/src/bin/psql/crosstabview.c @@ -0,0 +1,938 @@ +/* + * psql - the PostgreSQL interactive terminal + * + * Copyright (c) 2000-2016, PostgreSQL Global Development Group + * + * src/bin/psql/crosstabview.c + */ + +#include "common.h" +#include "crosstabview.h" +#include "pqexpbuffer.h" +#include "settings.h" +#include <string.h> + +/* + * Value/position from the resultset that goes into the horizontal or vertical + * crosstabview header. + */ +struct pivot_field +{ + /* + * Pointer obtained from PQgetvalue() for colV or colH. Each distinct + * value becomes an entry in the vertical header (colV), or horizontal + * header (colH). + * A Null value is represented by a NULL pointer. + */ + char *name; + + /* + * When a sort is requested on an alternative column, this holds + * PQgetvalue() for the sort column corresponding to <name>. If + * <name> appear multiple times, it's the first value in the + * order of the results that is kept. + * A Null value is represented by a NULL pointer. + */ + char *sort_value; + + /* + * Rank of this value, starting at 0. Initially, it's the relative position + * of the first appearance of <name> in the resultset. + * For example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3 + * When a sort column is specified, ranks get updated in a final pass to reflect + * the desired order. + */ + int rank; +}; + +/* Node in avl_tree */ +struct avl_node +{ + /* Node contents */ + struct pivot_field field; + + /* + * Height of this node in the tree (number of nodes on the longest + * path to a leaf). + */ + int height; + + /* + * Child nodes. [0] points to left subtree, [1] to right subtree. + * Never NULL, points to the empty node avl_tree.end when no left + * or right value. + */ + struct avl_node *childs[2]; +}; + +/* + * Control structure for the AVL tree (binary search tree kept + * balanced with the AVL algorithm) + */ +struct avl_tree +{ + int count; /* Total number of nodes */ + struct avl_node *root; /* root of the tree */ + struct avl_node *end; /* Immutable dereferenceable empty tree */ +}; + +/* + * Value comparator for vertical and horizontal headers + * used for deduplication only. + * - null values are considered equal + * - non-null < null + * - non-null values are compared with strcmp() + */ +static int +pivotFieldCompare(const void *a, const void *b) +{ + struct pivot_field* pa = (struct pivot_field*) a; + struct pivot_field* pb = (struct pivot_field*) b; + + /* test null values */ + if (!pb->name) + return pa->name ? -1 : 0; + else if (!pa->name) + return 1; + /* non-null values */ + return strcmp( ((struct pivot_field*)a)->name, + ((struct pivot_field*)b)->name); +} + +static int +rankCompare(const void* a, const void* b) +{ + return *((int*)a) - *((int*)b); +} + +/* + * The avl* functions below provide a minimalistic implementation of AVL binary + * trees, to efficiently collect the distinct values that will form the horizontal + * and vertical headers. It only supports adding new values, no removal or even + * search. + */ +static void +avlInit(struct avl_tree *tree) +{ + tree->end = (struct avl_node*) pg_malloc0(sizeof(struct avl_node)); + tree->end->childs[0] = tree->end->childs[1] = tree->end; + tree->count = 0; + tree->root = tree->end; +} + +/* Deallocate recursively an AVL tree, starting from node */ +static void +avlFree(struct avl_tree* tree, struct avl_node* node) +{ + if (node->childs[0] != tree->end) + { + avlFree(tree, node->childs[0]); + pg_free(node->childs[0]); + } + if (node->childs[1] != tree->end) + { + avlFree(tree, node->childs[1]); + pg_free(node->childs[1]); + } + if (node == tree->root) { + /* free the root separately as it's not child of anything */ + if (node != tree->end) + pg_free(node); + /* free the tree->end struct only once and when all else is freed */ + pg_free(tree->end); + } +} + +/* Set the height to 1 plus the greatest of left and right heights */ +static void +avlUpdateHeight(struct avl_node *n) +{ + n->height = 1 + (n->childs[0]->height > n->childs[1]->height ? + n->childs[0]->height: + n->childs[1]->height); +} + +/* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */ +static struct avl_node* +avlRotate(struct avl_node **current, int dir) +{ + struct avl_node *before = *current; + struct avl_node *after = (*current)->childs[dir]; + + *current = after; + before->childs[dir] = after->childs[!dir]; + avlUpdateHeight(before); + after->childs[!dir] = before; + + return after; +} + +static int +avlBalance(struct avl_node *n) +{ + return n->childs[0]->height - n->childs[1]->height; +} + +/* + * After an insertion, possibly rebalance the tree so that the left and right + * node heights don't differ by more than 1. + * May update *node. + */ +static void +avlAdjustBalance(struct avl_tree *tree, struct avl_node **node) +{ + struct avl_node *current = *node; + int b = avlBalance(current)/2; + if (b != 0) + { + int dir = (1 - b)/2; + if (avlBalance(current->childs[dir]) == -b) + avlRotate(¤t->childs[dir], !dir); + current = avlRotate(node, dir); + } + if (current != tree->end) + avlUpdateHeight(current); +} + +/* + * Insert a new value/field, starting from *node, reaching the + * correct position in the tree by recursion. + * Possibly rebalance the tree and possibly update *node. + * Do nothing if the value is already present in the tree. + */ +static void +avlInsertNode(struct avl_tree* tree, + struct avl_node **node, + struct pivot_field field) +{ + struct avl_node *current = *node; + + if (current == tree->end) + { + struct avl_node * new_node = (struct avl_node*) + pg_malloc(sizeof(struct avl_node)); + new_node->height = 1; + new_node->field = field; + new_node->childs[0] = new_node->childs[1] = tree->end; + tree->count++; + *node = new_node; + } + else + { + int cmp = pivotFieldCompare(&field, ¤t->field); + if (cmp != 0) + { + avlInsertNode(tree, + cmp > 0 ? ¤t->childs[1] : ¤t->childs[0], + field); + avlAdjustBalance(tree, node); + } + } +} + +/* Insert the value into the AVL tree, if it does not preexist */ +static void +avlMergeValue(struct avl_tree* tree, char* name, char* sort_value) +{ + struct pivot_field field; + field.name = name; + field.rank = tree->count; + field.sort_value = sort_value; + avlInsertNode(tree, &tree->root, field); +} + +/* + * Recursively extract node values into the names array, in sorted order with a + * left-to-right tree traversal. + * Return the next candidate offset to write into the names array. + * fields[] must be preallocated to hold tree->count entries + */ +static int +avlCollectFields(struct avl_tree* tree, + struct avl_node* node, + struct pivot_field* fields, + int idx) +{ + if (node == tree->end) + return idx; + idx = avlCollectFields(tree, node->childs[0], fields, idx); + fields[idx] = node->field; + return avlCollectFields(tree, node->childs[1], fields, idx+1); +} + + +static void +rankSort(int num_columns, struct pivot_field* piv_columns) +{ + int* hmap; /* [[offset in piv_columns, rank], ...for every header entry] */ + int i; + + hmap = (int*) pg_malloc(sizeof(int) * num_columns * 2); + for (i = 0; i < num_columns; i++) + { + char *val = piv_columns[i].sort_value; + /* ranking information is valid if non null and matches /^-?\d+$/ */ + if (val && ((*val == '-' && strspn(val+1, "0123456789") == strlen(val+1) ) + || strspn(val, "0123456789") == strlen(val))) + { + hmap[i*2] = atoi(val); + hmap[i*2+1] = i; + } + else + { + /* invalid rank information ignored (equivalent to rank 0) */ + hmap[i*2] = 0; + hmap[i*2+1] = i; + } + } + + qsort(hmap, num_columns, sizeof(int)*2, rankCompare); + + for (i=0; i < num_columns; i++) + { + piv_columns[hmap[i*2+1]].rank = i; + } + + pg_free(hmap); +} + + +/* + * Output the pivoted resultset with the printTable* functions + */ +static void +printCrosstab(const PGresult *results, + int num_columns, + struct pivot_field *piv_columns, + int field_for_columns, + int num_rows, + struct pivot_field *piv_rows, + int field_for_rows, + int *colsG, + int colsG_num) +{ + printQueryOpt popt = pset.popt; + printTableContent cont; + int i, j, rn; + char col_align = 'l'; /* alignment for values inside the grid */ + int* horiz_map; /* map indices from sorted horizontal headers to piv_columns */ + char** allocated_cells; /* Pointers for cell contents that are allocated + * in this function, when cells cannot simply point to + * PQgetvalue(results, ...) */ + + printTableInit(&cont, &popt.topt, popt.title, num_columns+1, num_rows); + + /* Step 1: set target column names (horizontal header) */ + + /* The name of the first column is kept unchanged by the pivoting */ + printTableAddHeader(&cont, + PQfname(results, field_for_rows), + false, + column_type_alignment(PQftype(results, field_for_rows))); + + /* + * To iterate over piv_columns[] by piv_columns[].rank, create a reverse map + * associating each piv_columns[].rank to its index in piv_columns. + * This avoids an O(N^2) loop later + */ + horiz_map = (int*) pg_malloc(sizeof(int) * num_columns); + for (i = 0; i < num_columns; i++) + { + horiz_map[piv_columns[i].rank] = i; + } + + /* + * In the common case of only one field projected into the cells, the + * display alignment depends on its PQftype(). Otherwise the contents are + * made-up strings, so the alignment is 'l' + */ + if (colsG_num == 1) + col_align = column_type_alignment(PQftype(results, colsG[0])); + else + col_align = 'l'; + + for (i = 0; i < num_columns; i++) + { + char *colname = piv_columns[horiz_map[i]].name ? + piv_columns[horiz_map[i]].name : + (popt.nullPrint ? popt.nullPrint : ""); + + printTableAddHeader(&cont, + colname, + false, + col_align); + } + pg_free(horiz_map); + + /* Step 2: set row names in the first output column (vertical header) */ + for (i = 0; i < num_rows; i++) + { + int k = piv_rows[i].rank; + cont.cells[k*(num_columns+1)] = piv_rows[i].name ? + piv_rows[i].name : + (popt.nullPrint ? popt.nullPrint : ""); + /* Initialize all cells inside the grid to an empty value */ + for (j = 0; j < num_columns; j++) + cont.cells[k*(num_columns+1)+j+1] = ""; + } + cont.cellsadded = num_rows * (num_columns+1); + + allocated_cells = (char**) pg_malloc0(num_rows * num_columns * sizeof(char*)); + + /* Step 3: set all the cells "inside the grid" */ + for (rn = 0; rn < PQntuples(results); rn++) + { + int row_number; + int col_number; + struct pivot_field *p; + + /* Find target row */ + struct pivot_field elt; + if (!PQgetisnull(results, rn, field_for_rows)) + elt.name = PQgetvalue(results, rn, field_for_rows); + else + elt.name = NULL; + p = (struct pivot_field*) bsearch(&elt, + piv_rows, + num_rows, + sizeof(struct pivot_field), + pivotFieldCompare); + + row_number = p ? p->rank : -1; + + /* Find target column */ + if (!PQgetisnull(results, rn, field_for_columns)) + elt.name = PQgetvalue(results, rn, field_for_columns); + else + elt.name = NULL; + + p = (struct pivot_field*) bsearch(&elt, + piv_columns, + num_columns, + sizeof(struct pivot_field), + pivotFieldCompare); + col_number = p? p->rank : -1; + + /* Place value into cell */ + if (col_number>=0 && row_number>=0) + { + int idx = 1 + col_number + row_number*(num_columns+1); + int src_col = 0; /* column number in source result */ + + /* + * special case: when the source has only 2 columns, use a + * X (cross/checkmark) for the cell content, and set + * src_col to a virtual additional column. + */ + if (PQnfields(results) == 2) + src_col = -1; + + for (i=0; i<colsG_num || src_col==-1; i++) + { + char *content; + + if (src_col == -1) + { + content = "X"; + } + else + { + src_col = colsG[i]; + + content = (!PQgetisnull(results, rn, src_col)) ? + PQgetvalue(results, rn, src_col) : + (popt.nullPrint ? popt.nullPrint : ""); + } + + if (cont.cells[idx] != NULL && cont.cells[idx][0] != '\0') + { + /* + * Multiple values for the same (row,col) are projected + * into the same cell. When this happens, separate the + * previous content of the cell from the new value by a + * newline. + */ + int content_size = + strlen(cont.cells[idx]) + + 2 /* room for [CR],LF or space */ + + strlen(content) + + 1; /* '\0' */ + char *new_content; + + /* + * idx2 is an index into allocated_cells. It differs from + * idx (index into cont.cells), because vertical and + * horizontal headers are included in `cont.cells` but + * excluded from allocated_cells. + */ + int idx2 = (row_number * num_columns) + col_number; + + if (allocated_cells[idx2] != NULL) + { + new_content = pg_realloc(allocated_cells[idx2], content_size); + } + else + { + /* + * At this point, cont.cells[idx] still contains a + * PQgetvalue() pointer. Just after, it will contain + * a new pointer maintained in allocated_cells[], and + * freed at the end of this function. + */ + new_content = pg_malloc(content_size); + strcpy(new_content, cont.cells[idx]); + } + cont.cells[idx] = new_content; + allocated_cells[idx2] = new_content; + + /* + * Contents that are on adjacent columns in the source results get + * separated by one space in the target. + * Contents that are on different rows in the source get + * separated by newlines in the target. + */ + if (i==0) + strcat(new_content, "\n"); + else + strcat(new_content, " "); + strcat(new_content, content); + } + else + { + cont.cells[idx] = content; + } + + /* special case of the "virtual column" for checkmark */ + if (src_col == -1) + break; + } + } + } + + printTable(&cont, pset.queryFout, false, pset.logfile); + printTableCleanup(&cont); + + + for (i=0; i < num_rows * num_columns; i++) + { + if (allocated_cells[i] != NULL) + pg_free(allocated_cells[i]); + } + + pg_free(allocated_cells); +} + + +/* + * Compare a user-supplied argument against a field name obtained by PQfname(), + * which is already case-folded. + * If arg is not enclosed in double quotes, pg_strcasecmp applies, otherwise + * do a case-sensitive comparison with these rules: + * - double quotes enclosing 'arg' are filtered out + * - double quotes inside 'arg' are expected to be doubled + */ +static bool +fieldNameEquals(const char *arg, const char *fieldname) +{ + const char* p = arg; + const char* f = fieldname; + char c; + + if (*p++ != '"') + return !pg_strcasecmp(arg, fieldname); + + while ((c = *p++)) + { + if (c == '"') + { + if (*p == '"') + p++; /* skip second quote and continue */ + else if (*p == '\0') + return (*f == '\0'); /* p is shorter than f, or is identical */ + } + if (*f == '\0') + return false; /* f is shorter than p */ + if (c != *f) /* found one byte that differs */ + return false; + f++; + } + return (*f=='\0'); +} + +/* + * arg can be a number or a column name, possibly quoted (like in an ORDER BY clause) + * Returns: + * on success, the 0-based index of the column + * or -1 if the column number or name is not found in the result's structure, + * or if it's ambiguous (arg corresponding to several columns) + */ +static int +indexOfColumn(const char *arg, PGresult *res) +{ + int idx; + + if (strspn(arg, "0123456789") == strlen(arg)) + { + /* if arg contains only digits, it's a column number */ + idx = atoi(arg) - 1; + if (idx < 0 || idx >= PQnfields(res)) + { + psql_error(_("Invalid column number: %s\n"), arg); + return -1; + } + } + else + { + int i; + idx = -1; + for (i=0; i < PQnfields(res); i++) + { + if (fieldNameEquals(arg, PQfname(res, i))) + { + if (idx>=0) + { + /* if another idx was already found for the same name */ + psql_error(_("Ambiguous column name: %s\n"), arg); + return -1; + } + idx = i; + } + } + if (idx == -1) + { + psql_error(_("Invalid column name: %s\n"), arg); + return -1; + } + } + return idx; +} + +/* + * Parse col1[<sep>col2][<sep>col3]... + * where colN can be: + * - a number from 1 to PQnfields(res) + * - an unquoted column name matching (case insensitively) one of PQfname(res,...) + * - a quoted column name matching (case sensitively) one of PQfname(res,...) + * max_columns: 0 if no maximum + */ +static int +parseColumnRefs(char* arg, + PGresult *res, + int **col_numbers, + int max_columns, + char separator) +{ + char *p = arg; + char c; + int col_num = -1; + int nb_cols = 0; + char* field_start = NULL; + *col_numbers = NULL; + while ((c = *p) != '\0') + { + bool quoted_field = false; + field_start = p; + + /* first char */ + if (c == '"') + { + quoted_field = true; + p++; + } + + while ((c = *p) != '\0') + { + if (c == separator && !quoted_field) + break; + if (c == '"') /* end of field or embedded double quote */ + { + p++; + if (*p == '"') + { + if (quoted_field) + { + p++; + continue; + } + } + else if (quoted_field && *p == separator) + break; + } + p += PQmblen(p, pset.encoding); + } + + if (p != field_start) + { + /* look up the column and add its index into *col_numbers */ + if (max_columns != 0 && nb_cols == max_columns) + { + psql_error(_("No more than %d column references expected\n"), max_columns); + goto errfail; + } + c = *p; + *p = '\0'; + col_num = indexOfColumn(field_start, res); + *p = c; + if (col_num < 0) + goto errfail; + *col_numbers = (int*)pg_realloc(*col_numbers, (1+nb_cols)*sizeof(int)); + (*col_numbers)[nb_cols++] = col_num; + } + else + { + psql_error(_("Empty column reference\n")); + goto errfail; + } + + if (*p) + p += PQmblen(p, pset.encoding); + } + return nb_cols; + +errfail: + pg_free(*col_numbers); + *col_numbers = NULL; + return -1; +} + + +/* + * Main function. + * Process the data from *res according the display options in pset (global), + * to generate the horizontal and vertical headers contents, + * then call printCrosstab() for the actual output. + */ +bool +PrintResultsInCrossTab(PGresult* res) +{ + /* COLV or null */ + char* opt_field_for_rows = pset.crosstabview_col_V; + /* COLH[:SCOLH] or null */ + char* opt_field_for_columns = pset.crosstabview_col_H; + int rn; + struct avl_tree piv_columns; + struct avl_tree piv_rows; + struct pivot_field* array_columns = NULL; + struct pivot_field* array_rows = NULL; + int num_columns = 0; + int num_rows = 0; + bool retval = false; + /* + * column definitions involved in the vertical header, horizontal header, + * and grid + */ + int *colsV = NULL, *colsH = NULL, *colsG = NULL; + int colsG_num; + int nn; + + /* 0-based index of the field whose distinct values will become COLUMN headers */ + int field_for_columns = -1; + int sort_field_for_columns = -1; + + /* 0-based index of the field whose distinct values will become ROW headers */ + int field_for_rows = -1; + + avlInit(&piv_rows); + avlInit(&piv_columns); + + if (res == NULL) + { + psql_error(_("No result\n")); + goto error_return; + } + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + psql_error(_("The query must return results to be shown in crosstab\n")); + goto error_return; + } + + if (PQnfields(res) < 2) + { + psql_error(_("The query must return at least two columns to be shown in crosstab\n")); + goto error_return; + } + + /* + * Arguments processing for the vertical header (1st arg) + * displayed in the left-most column. Only a reference to a field + * is accepted (no sort column). + */ + + if (opt_field_for_rows == NULL) + { + field_for_rows = 0; + } + else + { + nn = parseColumnRefs(opt_field_for_rows, res, &colsV, 1, ':'); + if (nn != 1) + goto error_return; + field_for_rows = colsV[0]; + } + + if (field_for_rows < 0) + goto error_return; + + /* + * Arguments processing for the horizontal header (2nd arg) + * (pivoted column that gets displayed as the first row). + * Determine: + * - the sort direction if any + * - the field number of that column in the PGresult + * - the field number of the associated sort column if any + */ + + if (opt_field_for_columns == NULL) + field_for_columns = 1; + else + { + nn = parseColumnRefs(opt_field_for_columns, res, &colsH, 2, ':'); + if (nn <= 0) + goto error_return; + if (nn==1) + field_for_columns = colsH[0]; + else + { + field_for_columns = colsH[0]; + sort_field_for_columns = colsH[1]; + } + + if (field_for_columns < 0) + goto error_return; + } + + if (field_for_columns == field_for_rows) + { + psql_error(_("The same column cannot be used for both vertical and horizontal headers\n")); + goto error_return; + } + + /* + * Arguments processing for the columns aside from headers (3rd arg) + * Determine the columns to display in the grid and their order. + */ + if (pset.crosstabview_cols_grid == NULL) + { + /* + * By defaut, all the fields from PGresult get displayed into the grid, + * except the two fields that go into the vertical and horizontal + * headers. + */ + if (PQnfields(res) > 2) + { + int i, j=0; + colsG = (int*)pg_malloc(sizeof(int) * (PQnfields(res)-2)); + for (i=0; i<PQnfields(res); i++) + { + if (i!=field_for_rows && i!=field_for_columns) + colsG[j++] = i; + } + colsG_num = PQnfields(res)-2; + } + else + { + colsG = NULL; + colsG_num = 0; + } + } + else + { + /* + * Non-default case: a list of fields is given. + * Parse that list to determine the fields to display into the grid, + * and in what order. + * The list format is colA[,colB[,colC...]] + */ + colsG_num = parseColumnRefs(pset.crosstabview_cols_grid, + res, &colsG, PQnfields(res), ','); + if (colsG_num <= 0) + goto error_return; + } + + /* + * First part: accumulate the names that go into the vertical and + * horizontal headers, each into an AVL binary tree to build the set of + * DISTINCT values. + */ + + for (rn = 0; rn < PQntuples(res); rn++) + { + /* horizontal */ + char* val = PQgetisnull(res, rn, field_for_columns) ? NULL: + PQgetvalue(res, rn, field_for_columns); + + if (sort_field_for_columns >= 0) + { + char* val1 = PQgetisnull(res, rn, sort_field_for_columns) ? NULL: + PQgetvalue(res, rn, sort_field_for_columns); + + avlMergeValue(&piv_columns, val, val1); + } + else + { + avlMergeValue(&piv_columns, val, NULL); + } + + if (piv_columns.count > 1600) + { + psql_error(_("Maximum number of columns (1600) exceeded\n")); + goto error_return; + } + + /* vertical */ + val = PQgetisnull(res, rn, field_for_rows) ? NULL: + PQgetvalue(res, rn, field_for_rows); + + avlMergeValue(&piv_rows, val, NULL); + } + + /* + * Second part: Generate sorted arrays from the AVL trees. + */ + + num_columns = piv_columns.count; + num_rows = piv_rows.count; + + array_columns = (struct pivot_field*) + pg_malloc(sizeof(struct pivot_field) * num_columns); + + array_rows = (struct pivot_field*) + pg_malloc(sizeof(struct pivot_field) * num_rows); + + avlCollectFields(&piv_columns, piv_columns.root, array_columns, 0); + avlCollectFields(&piv_rows, piv_rows.root, array_rows, 0); + + /* + * Third part: optionally, process the ranking data for the horizontal + * header + */ + if (sort_field_for_columns >= 0) + rankSort(num_columns, array_columns); + + /* + * Fourth part: print the crosstab'ed results. + */ + printCrosstab(res, + num_columns, + array_columns, + field_for_columns, + num_rows, + array_rows, + field_for_rows, + colsG, + colsG_num); + + retval = true; + +error_return: + avlFree(&piv_columns, piv_columns.root); + avlFree(&piv_rows, piv_rows.root); + pg_free(array_columns); + pg_free(array_rows); + pg_free(colsV); + pg_free(colsH); + pg_free(colsG); + + return retval; +} diff --git a/src/bin/psql/crosstabview.h b/src/bin/psql/crosstabview.h new file mode 100644 index 0000000..d374cfe --- /dev/null +++ b/src/bin/psql/crosstabview.h @@ -0,0 +1,14 @@ +/* + * psql - the PostgreSQL interactive terminal + * + * Copyright (c) 2000-2016, PostgreSQL Global Development Group + * + * src/bin/psql/crosstabview.h + */ + +#ifndef CROSSTABVIEW_H +#define CROSSTABVIEW_H + +/* prototypes */ +extern bool PrintResultsInCrossTab(PGresult *res); +#endif /* CROSSTABVIEW_H */ diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c index 59f6f25..f5411ac 100644 --- a/src/bin/psql/help.c +++ b/src/bin/psql/help.c @@ -175,6 +175,7 @@ slashUsage(unsigned short int pager) fprintf(output, _(" \\g [FILE] or ; execute query (and send results to file or |pipe)\n")); fprintf(output, _(" \\gset [PREFIX] execute query and store results in psql variables\n")); fprintf(output, _(" \\q quit psql\n")); + fprintf(output, _(" \\crosstabview [COLUMNS] execute query and display results in crosstab\n")); fprintf(output, _(" \\watch [SEC] execute query every SEC seconds\n")); fprintf(output, "\n"); diff --git a/src/bin/psql/print.c b/src/bin/psql/print.c index 85dbd30..32360a6 100644 --- a/src/bin/psql/print.c +++ b/src/bin/psql/print.c @@ -3293,30 +3293,9 @@ printQuery(const PGresult *result, const printQueryOpt *opt, for (i = 0; i < cont.ncolumns; i++) { - char align; - Oid ftype = PQftype(result, i); - - switch (ftype) - { - case INT2OID: - case INT4OID: - case INT8OID: - case FLOAT4OID: - case FLOAT8OID: - case NUMERICOID: - case OIDOID: - case XIDOID: - case CIDOID: - case CASHOID: - align = 'r'; - break; - default: - align = 'l'; - break; - } - printTableAddHeader(&cont, PQfname(result, i), - opt->translate_header, align); + opt->translate_header, + column_type_alignment(PQftype(result, i))); } /* set cells */ @@ -3358,6 +3337,31 @@ printQuery(const PGresult *result, const printQueryOpt *opt, printTableCleanup(&cont); } +char +column_type_alignment(Oid ftype) +{ + char align; + + switch (ftype) + { + case INT2OID: + case INT4OID: + case INT8OID: + case FLOAT4OID: + case FLOAT8OID: + case NUMERICOID: + case OIDOID: + case XIDOID: + case CIDOID: + case CASHOID: + align = 'r'; + break; + default: + align = 'l'; + break; + } + return align; +} void setDecimalLocale(void) diff --git a/src/bin/psql/print.h b/src/bin/psql/print.h index 005ba15..d5d62b2 100644 --- a/src/bin/psql/print.h +++ b/src/bin/psql/print.h @@ -174,7 +174,7 @@ extern FILE *PageOutput(int lines, const printTableOpt *topt); extern void ClosePager(FILE *pagerpipe); extern void html_escaped_print(const char *in, FILE *fout); - +extern char column_type_alignment(Oid); extern void printTableInit(printTableContent *const content, const printTableOpt *opt, const char *title, const int ncolumns, const int nrows); diff --git a/src/bin/psql/settings.h b/src/bin/psql/settings.h index 20a6470..9b7f7c4 100644 --- a/src/bin/psql/settings.h +++ b/src/bin/psql/settings.h @@ -90,6 +90,10 @@ typedef struct _psqlSettings char *gfname; /* one-shot file output argument for \g */ char *gset_prefix; /* one-shot prefix argument for \gset */ + bool crosstabview_output; /* one-shot request to print results in crosstab */ + char *crosstabview_col_V; /* one-shot \crosstabview 1st argument */ + char *crosstabview_col_H; /* one-shot \crosstabview 2nd argument */ + char *crosstabview_cols_grid; /* one-shot \crosstabview 3nd argument */ bool notty; /* stdin or stdout is not a tty (as determined * on startup) */ diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 6a81416..5f18a5d 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1273,7 +1273,8 @@ psql_completion(const char *text, int start, int end) /* psql's backslash commands. */ static const char *const backslash_commands[] = { - "\\a", "\\connect", "\\conninfo", "\\C", "\\cd", "\\copy", "\\copyright", + "\\a", "\\connect", "\\conninfo", "\\C", "\\cd", "\\copy", + "\\copyright", "\\crosstabview", "\\d", "\\da", "\\db", "\\dc", "\\dC", "\\dd", "\\ddp", "\\dD", "\\des", "\\det", "\\deu", "\\dew", "\\dE", "\\df", "\\dF", "\\dFd", "\\dFp", "\\dFt", "\\dg", "\\di", "\\dl", "\\dL",
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers