Tom Lane wrote:
> I agree that it seems like a good idea to try.
> There will be more per-row overhead, but the increase in flexibility
> is likely to justify that.
Here's a POC patch implementing row-by-row fetching.
If it wasn't for the per-row overhead, we could probably get rid of
ExecQueryUsingCursor() and use row-by-row fetches whenever
FETCH_COUNT is set, independently of the form of the query.
However the difference in processing time seems to be substantial: on
some quick tests with FETCH_COUNT=10000, I'm seeing almost a 1.5x
increase on large datasets. I assume it's the cost of more allocations.
I would have hoped that avoiding the FETCH queries and associated
round-trips with the cursor method would compensate for that, but it
doesn't appear to be the case, at least with a fast local connection.
So in this patch, psql still uses the cursor method if the
query starts with "select", and falls back to the row-by-row in
the main code (ExecQueryAndProcessResults) otherwise.
Anyway it solves the main issue of the over-consumption of memory
for CTE and update/insert queries returning large resultsets.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 00627830c4..d3de9d8336 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -372,6 +372,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_SINGLE_TUPLE:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -675,13 +676,13 @@ PrintNotifications(void)
* Returns true if successful, false otherwise.
*/
static bool
-PrintQueryTuples(const PGresult *result, const printQueryOpt *opt,
+PrintQueryTuples(const PGresult **result, int nresults, const printQueryOpt *opt,
FILE *printQueryFout)
{
bool ok = true;
FILE *fout = printQueryFout ? printQueryFout : pset.queryFout;
- printQuery(result, opt ? opt : &pset.popt, fout, false, pset.logfile);
+ printQueryChunks(result, nresults, opt ? opt : &pset.popt, fout, false, pset.logfile);
fflush(fout);
if (ferror(fout))
{
@@ -958,7 +959,7 @@ PrintQueryResult(PGresult *result, bool last,
else if (last && pset.crosstab_flag)
success = PrintResultInCrosstab(result);
else if (last || pset.show_all_results)
- success = PrintQueryTuples(result, opt, printQueryFout);
+ success = PrintQueryTuples((const PGresult**)&result, 1, opt, printQueryFout);
else
success = true;
@@ -1369,6 +1370,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_SINGLE_TUPLE ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ pclose(gfile_fout);
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1400,10 +1442,16 @@ ExecQueryAndProcessResults(const char *query,
bool success;
instr_time before,
after;
+ int fetch_count = pset.fetch_count;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ PGresult **result_array = NULL; /* to collect results in single row mode */
+ int64 total_tuples = 0;
+ int ntuples;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
@@ -1424,6 +1472,33 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * If FETCH_COUNT is set and the context allows it, use the single row
+ * mode to fetch results and have no more than FETCH_COUNT rows in
+ * memory.
+ */
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ /*
+ * The row-by-row fetch is not enabled when SHOW_ALL_RESULTS is false,
+ * since we would need to accumulate all rows before knowing
+ * whether they need to be discarded or displayed, which contradicts
+ * FETCH_COUNT.
+ */
+ if (!PQsetSingleRowMode(pset.db))
+ {
+ pg_log_warning("fetching results in single row mode is unavailable");
+ fetch_count = 0;
+ }
+ else
+ {
+ result_array = (PGresult**) pg_malloc(fetch_count * sizeof(PGresult*));
+ }
+ }
+ else
+ fetch_count = 0; /* disable single-row mode */
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1443,6 +1518,7 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1569,6 +1645,85 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (fetch_count > 0 && result_status == PGRES_SINGLE_TUPLE)
+ {
+ FILE *tuples_fout = printQueryFout;
+ printQueryOpt my_popt = pset.popt;
+
+ ntuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ result_array[ntuples++] = result;
+ if (ntuples == fetch_count)
+ {
+ /* TODO: handle paging */
+ /* display the current chunk of results */
+ PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+ /* clear and reuse result_array */
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += ntuples;
+ total_tuples += ntuples;
+ ntuples = 0;
+ }
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the row-by-row sequence.
+ */
+ success = false;
+ break;
+ }
+
+ if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /* TODO: merge this block with the code above? */
+ /*
+ * The last row has been read. Display the last chunk of
+ * results and the footer.
+ */
+ my_popt.topt.stop_table = true;
+ PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ total_tuples += ntuples;
+ ntuples = 0;
+
+ result = NULL;
+ {
+ /*
+ * fake SetResultVariables() as in ExecQueryUsingCursor().
+ */
+ char buf[32];
+
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1597,7 +1752,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1606,25 +1761,10 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
@@ -1643,17 +1783,10 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- pclose(gfile_fout);
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
+
+ if (result_array)
+ pg_free(result_array);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
diff --git a/src/fe_utils/print.c b/src/fe_utils/print.c
index 3396f9b462..d8f0a29773 100644
--- a/src/fe_utils/print.c
+++ b/src/fe_utils/print.c
@@ -3533,17 +3533,42 @@ printTable(const printTableContent *cont,
void
printQuery(const PGresult *result, const printQueryOpt *opt,
FILE *fout, bool is_pager, FILE *flog)
+{
+ printQueryChunks(&result, 1, opt, fout, is_pager, flog);
+}
+
+/*
+ * Print the results of a query that may have been obtained by a
+ * succession of calls to PQgetResult in single-row mode.
+ *
+ * results: array of results of a successful query. They must have the same columns.
+ * nbresults: size of results
+ * opt: formatting options
+ * fout: where to print to
+ * is_pager: true if caller has already redirected fout to be a pager pipe
+ * flog: if not null, also print the data there (for --log-file option)
+ */
+void
+printQueryChunks(const PGresult *results[], int nresults, const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog)
{
printTableContent cont;
int i,
r,
c;
+ int nrows = 0; /* total number of rows */
+ int ri; /* index into results[] */
if (cancel_pressed)
return;
+ for (ri = 0; ri < nresults; ri++)
+ {
+ nrows += PQntuples(results[ri]);
+ }
+
printTableInit(&cont, &opt->topt, opt->title,
- PQnfields(result), PQntuples(result));
+ (nresults > 0) ? PQnfields(results[0]) : 0, nrows);
/* Assert caller supplied enough translate_columns[] entries */
Assert(opt->translate_columns == NULL ||
@@ -3551,34 +3576,37 @@ printQuery(const PGresult *result, const printQueryOpt *opt,
for (i = 0; i < cont.ncolumns; i++)
{
- printTableAddHeader(&cont, PQfname(result, i),
+ printTableAddHeader(&cont, PQfname(results[0], i),
opt->translate_header,
- column_type_alignment(PQftype(result, i)));
+ column_type_alignment(PQftype(results[0], i)));
}
/* set cells */
- for (r = 0; r < cont.nrows; r++)
+ for (ri = 0; ri < nresults; ri++)
{
- for (c = 0; c < cont.ncolumns; c++)
+ for (r = 0; r < PQntuples(results[ri]); r++)
{
- char *cell;
- bool mustfree = false;
- bool translate;
-
- if (PQgetisnull(result, r, c))
- cell = opt->nullPrint ? opt->nullPrint : "";
- else
+ for (c = 0; c < cont.ncolumns; c++)
{
- cell = PQgetvalue(result, r, c);
- if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ char *cell;
+ bool mustfree = false;
+ bool translate;
+
+ if (PQgetisnull(results[ri], r, c))
+ cell = opt->nullPrint ? opt->nullPrint : "";
+ else
{
- cell = format_numeric_locale(cell);
- mustfree = true;
+ cell = PQgetvalue(results[ri], r, c);
+ if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ {
+ cell = format_numeric_locale(cell);
+ mustfree = true;
+ }
}
- }
- translate = (opt->translate_columns && opt->translate_columns[c]);
- printTableAddCell(&cont, cell, translate, mustfree);
+ translate = (opt->translate_columns && opt->translate_columns[c]);
+ printTableAddCell(&cont, cell, translate, mustfree);
+ }
}
}
diff --git a/src/include/fe_utils/print.h b/src/include/fe_utils/print.h
index 54f783c907..3befc41bdc 100644
--- a/src/include/fe_utils/print.h
+++ b/src/include/fe_utils/print.h
@@ -220,7 +220,10 @@ extern void printTableCleanup(printTableContent *const content);
extern void printTable(const printTableContent *cont,
FILE *fout, bool is_pager, FILE *flog);
extern void printQuery(const PGresult *result, const printQueryOpt *opt,
- FILE *fout, bool is_pager, FILE *flog);
+ FILE *fout, bool is_pager, FILE *flog);
+extern void printQueryChunks(const PGresult *results[], int nresults,
+ const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog);
extern char column_type_alignment(Oid);