On 24.04.2017 21:43, Andres Freund wrote:
Hi,
On 2017-04-24 11:46:02 +0300, Konstantin Knizhnik wrote:
So what I am thinking now is implicit query caching. If the same query with
different literal values is repeated many times, then we can try to
generalize this query and replace it with prepared query with
parameters.
That's not actuall all that easy:
- You pretty much do parse analysis to be able to do an accurate match.
How much overhead is parse analysis vs. planning in your cases?
- The invalidation infrastructure for this, if not tied to to fully
parse-analyzed statements, is going to be hell.
- Migrating to parameters can actually cause significant slowdowns, not
nice if that happens implicitly.
Well, first of all I want to share results I already get: pgbench with
default parameters, scale 10 and one connection:
protocol
TPS
simple
3492
extended
2927
prepared
6865
simple + autoprepare
6844
So autoprepare is as efficient as explicit prepare and can increase
performance almost two times.
My current implementation is replacing with parameters only string
literals in the query, i.e. select * from T where x='123'; -> select *
from T where x=$1;
It greatly simplifies matching of parameters - it is just necessary to
locate '\'' character and then correctly handle pairs of quotes.
Handling of integer and real literals is really challenged task.
One source of problems is negation: it is not so easy to correctly
understand whether minus should be treated as part of literal or as
operator:
(-1), (1-1), (1-1)-1
Another problem is caused by using integer literals in context where
parameters can not be used, for example "order by 1".
Fully correct substitution can be done by first performing parsing the
query, then transform parse tree, replacing literal nodes with parameter
nodes and finally deparse tree into generalized query. postgres_fdw
already contains such deparse code. It can be moved to postgres core and
reused for autoprepare (and may be somewhere else).
But in this case overhead will be much higher.
I still think that query parsing time is significantly smaller than time
needed for building and optimizing query execution plan.
But it should be measured if community will be interested in such approach.
There is obvious question: how I managed to get this pgbench results if
currently only substitution of string literals is supported and queries
constructed by pgbench don't contain string literals? I just made small
patch in pgbench replaceVariable method wrapping value's representation
in quotes. It has almost no impact on performance (3482 TPS vs. 3492 TPS),
but allows autoprepare to deal with pgbench queries.
I attached my patch to this mail. It is just first version of the patch
(based on REL9_6_STABLE branch) just to illustrate proposed approach.
I will be glad to receive any comments and if such optimization is
considered to be useful, I will continue work on this patch.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index f6be98b..6291d66 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -96,8 +96,6 @@ int max_stack_depth = 100;
/* wait N seconds to allow attach from a debugger */
int PostAuthDelay = 0;
-
-
/* ----------------
* private variables
* ----------------
@@ -188,6 +186,7 @@ static bool IsTransactionStmtList(List *parseTrees);
static void drop_unnamed_stmt(void);
static void SigHupHandler(SIGNAL_ARGS);
static void log_disconnections(int code, Datum arg);
+static bool exec_cached_query(const char *query_string);
/* ----------------------------------------------------------------
@@ -916,6 +915,14 @@ exec_simple_query(const char *query_string)
drop_unnamed_stmt();
/*
+ * Try to find cached plan
+ */
+ if (autoprepare_threshold != 0 && exec_cached_query(query_string))
+ {
+ return;
+ }
+
+ /*
* Switch to appropriate context for constructing parsetrees.
*/
oldcontext = MemoryContextSwitchTo(MessageContext);
@@ -4500,3 +4509,566 @@ log_disconnections(int code, Datum arg)
port->user_name, port->database_name, port->remote_host,
port->remote_port[0] ? " port=" : "", port->remote_port)));
}
+
+
+
+typedef struct {
+ char const* query;
+ int64 exec_count;
+ CachedPlanSource* plan;
+ int n_params;
+ int16 format;
+} plan_cache_entry;
+
+
+/*
+ * Replace string literals with parameters. We do not consider integer or real literals to avoid problems with
+ * negative number, user defined operators, ... For example it is not easy to distinguish cases (-1), (1-1), (1-1)-1
+ */
+static void generalize_statement(const char *query_string, char** gen_query, char** query_params, int* n_params)
+{
+ size_t query_len = strlen(query_string);
+ char const* src = query_string;
+ char* dst;
+ char* params;
+ unsigned char ch;
+
+ *n_params = 0;
+
+ *gen_query = (char*)palloc(query_len*2); /* assume that we have less than 1000 parameters, the worst case is replacing '' with $999 */
+ *query_params = (char*)palloc(query_len + 1);
+ dst = *gen_query;
+ params = *query_params;
+
+ while ((ch = *src++) != '\0') {
+ if (isspace(ch)) {
+ /* Replace sequence of whitespaces with just one space */
+ while (*src && isspace(*(unsigned char*)src)) {
+ src += 1;
+ }
+ *dst++ = ' ';
+ } else if (ch == '\'') {
+ while (true) {
+ ch = *src++;
+ if (ch == '\'') {
+ if (*src != '\'') {
+ break;
+ } else {
+ /* escaped quote */
+ *params++ = '\'';
+ src += 1;
+ }
+ } else {
+ *params++ = ch;
+ }
+ }
+ *params++ = '\0';
+ dst += sprintf(dst, "$%d", ++*n_params);
+ } else {
+ *dst++ = ch;
+ }
+ }
+ Assert(dst <= *gen_query + query_len);
+ Assert(params <= *query_params + query_len*2);
+ *dst = '\0';
+}
+
+static uint32 plan_cache_hash_fn(const void *key, Size keysize)
+{
+ return string_hash(((plan_cache_entry*)key)->query, 0);
+}
+
+static int plan_cache_match_fn(const void *key1, const void *key2, Size keysize)
+{
+ return strcmp(((plan_cache_entry*)key1)->query, ((plan_cache_entry*)key2)->query);
+}
+
+static void* plan_cache_keycopy_fn(void *dest, const void *src, Size keysize)
+{
+ ((plan_cache_entry*)dest)->query = pstrdup(((plan_cache_entry*)src)->query);
+ return dest;
+}
+
+#define PLAN_CACHE_SIZE 113
+
+/*
+ * Try to generalize query, find cached plan for it and execute
+ */
+static bool exec_cached_query(const char *query_string)
+{
+ CommandDest dest = whereToSendOutput;
+ DestReceiver *receiver;
+ char *gen_query;
+ char *query_params;
+ int n_params;
+ plan_cache_entry *entry;
+ bool found;
+ MemoryContext old_context;
+ CachedPlanSource *psrc;
+ ParamListInfo params;
+ int paramno;
+ CachedPlan *cplan;
+ Portal portal;
+ bool was_logged = false;
+ bool is_xact_command;
+ bool execute_is_fetch;
+ char completion_tag[COMPLETION_TAG_BUFSIZE];
+ bool save_log_statement_stats = log_statement_stats;
+ ParamListInfo portal_params;
+ const char *source_text;
+ char msec_str[32];
+ bool snapshot_set = false;
+
+ static HTAB* plan_cache;
+ static MemoryContext plan_cache_context;
+
+ /*
+ * Extract literals from query
+ */
+ generalize_statement(query_string, &gen_query, &query_params, &n_params);
+
+ if (plan_cache_context == NULL) {
+ plan_cache_context = AllocSetContextCreate(TopMemoryContext,
+ "plan cache context",
+ ALLOCSET_DEFAULT_SIZES);
+ }
+ old_context = MemoryContextSwitchTo(plan_cache_context);
+
+ /*
+ * Initialize hash table if not initialized yet
+ */
+ if (plan_cache == NULL)
+ {
+ static HASHCTL info;
+ info.keysize = sizeof(char*);
+ info.entrysize = sizeof(plan_cache_entry);
+ info.hash = plan_cache_hash_fn;
+ info.match = plan_cache_match_fn;
+ info.keycopy = plan_cache_keycopy_fn;
+ plan_cache = hash_create("plan_cache", PLAN_CACHE_SIZE, &info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
+ }
+
+ /*
+ * Lookup generalized query
+ */
+ entry = (plan_cache_entry*)hash_search(plan_cache, &gen_query, HASH_ENTER, &found);
+ if (!found) {
+ entry->exec_count = 0;
+ entry->plan = NULL;
+ entry->n_params = n_params;
+ } else {
+ Assert(entry->n_params == n_params);
+ }
+ MemoryContextSwitchTo(old_context);
+
+ /*
+ * Prepare query only when it is executed more than autoprepare_threshold times
+ */
+ if (entry->exec_count++ < autoprepare_threshold) {
+ return false;
+ }
+ if (entry->plan == NULL) {
+ List *parsetree_list;
+ Node *raw_parse_tree;
+ const char *command_tag;
+ Query *query;
+ List *querytree_list;
+ Oid *param_types = NULL;
+ int num_params = 0;
+
+ old_context = MemoryContextSwitchTo(MessageContext);
+
+ parsetree_list = pg_parse_query(gen_query);
+
+ /*
+ * Only single user statement are allowed in a prepared statement.
+ */
+ if (list_length(parsetree_list) != 1) {
+ MemoryContextSwitchTo(old_context);
+ return false;
+ }
+
+ raw_parse_tree = (Node *) linitial(parsetree_list);
+
+ /*
+ * Get the command name for possible use in status display.
+ */
+ command_tag = CreateCommandTag(raw_parse_tree);
+
+ /*
+ * If we are in an aborted transaction, reject all commands except
+ * COMMIT/ROLLBACK. It is important that this test occur before we
+ * try to do parse analysis, rewrite, or planning, since all those
+ * phases try to do database accesses, which may fail in abort state.
+ * (It might be safe to allow some additional utility commands in this
+ * state, but not many...)
+ */
+ if (IsAbortedTransactionBlockState() &&
+ !IsTransactionExitStmt(raw_parse_tree))
+ ereport(ERROR,
+ (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+ errmsg("current transaction is aborted, "
+ "commands ignored until end of transaction block"),
+ errdetail_abort()));
+
+ /*
+ * Create the CachedPlanSource before we do parse analysis, since it
+ * needs to see the unmodified raw parse tree.
+ */
+ psrc = CreateCachedPlan(raw_parse_tree, gen_query, command_tag);
+
+ /*
+ * Set up a snapshot if parse analysis will need one.
+ */
+ if (analyze_requires_snapshot(raw_parse_tree))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+
+ /*
+ * Analyze and rewrite the query. Note that the originally specified
+ * parameter set is not required to be complete, so we have to use
+ * parse_analyze_varparams().
+ */
+ if (log_parser_stats) {
+ ResetUsage();
+ }
+
+ query = parse_analyze_varparams(raw_parse_tree,
+ gen_query,
+ ¶m_types,
+ &num_params);
+ Assert(num_params == entry->n_params);
+
+ /*
+ * Check all parameter types got determined.
+ */
+ for (paramno = 0; paramno < n_params; paramno++)
+ {
+ Oid ptype = param_types[paramno];
+
+ if (ptype == InvalidOid || ptype == UNKNOWNOID) {
+ /* Type of parameter can not be determined */
+ MemoryContextSwitchTo(old_context);
+ return false;
+ }
+ }
+
+ if (log_parser_stats) {
+ ShowUsage("PARSE ANALYSIS STATISTICS");
+ }
+
+ querytree_list = pg_rewrite_query(query);
+
+ /* Done with the snapshot used for parsing */
+ if (snapshot_set) {
+ PopActiveSnapshot();
+ }
+
+ CompleteCachedPlan(psrc,
+ querytree_list,
+ NULL,
+ param_types,
+ n_params,
+ NULL,
+ NULL,
+ CURSOR_OPT_PARALLEL_OK, /* allow parallel mode */
+ true); /* fixed result */
+
+ /* If we got a cancel signal during analysis, quit */
+ CHECK_FOR_INTERRUPTS();
+
+ entry->format = 0; /* TEXT is default */
+ if (IsA(raw_parse_tree, FetchStmt))
+ {
+ FetchStmt *stmt = (FetchStmt *)raw_parse_tree;
+
+ if (!stmt->ismove)
+ {
+ Portal fportal = GetPortalByName(stmt->portalname);
+
+ if (PortalIsValid(fportal) &&
+ (fportal->cursorOptions & CURSOR_OPT_BINARY))
+ entry->format = 1; /* BINARY */
+ }
+ }
+
+ SaveCachedPlan(psrc);
+ entry->plan = psrc;
+ MemoryContextSwitchTo(old_context);
+
+ /*
+ * We do NOT close the open transaction command here; that only happens
+ * when the client sends Sync. Instead, do CommandCounterIncrement just
+ * in case something happened during parse/plan.
+ */
+ CommandCounterIncrement();
+ }
+ psrc = entry->plan;
+
+ /*
+ * If we are in aborted transaction state, the only portals we can
+ * actually run are those containing COMMIT or ROLLBACK commands. We
+ * disallow binding anything else to avoid problems with infrastructure
+ * that expects to run inside a valid transaction. We also disallow
+ * binding any parameters, since we can't risk calling user-defined I/O
+ * functions.
+ */
+ if (IsAbortedTransactionBlockState() &&
+ (!IsTransactionExitStmt(psrc->raw_parse_tree) ||
+ n_params != 0))
+ ereport(ERROR,
+ (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+ errmsg("current transaction is aborted, "
+ "commands ignored until end of transaction block"),
+ errdetail_abort()));
+
+ /*
+ * Create unnamed portal to run the query or queries in. If there
+ * already is one, silently drop it.
+ */
+ portal = CreatePortal("", true, true);
+ /* Don't display the portal in pg_cursors */
+ portal->visible = false;
+
+ /*
+ * Prepare to copy stuff into the portal's memory context. We do all this
+ * copying first, because it could possibly fail (out-of-memory) and we
+ * don't want a failure to occur between GetCachedPlan and
+ * PortalDefineQuery; that would result in leaking our plancache refcount.
+ */
+ old_context = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+
+ /* Copy the plan's query string into the portal */
+ query_string = pstrdup(psrc->query_string);
+
+ /*
+ * Set a snapshot if we have parameters to fetch (since the input
+ * functions might need it) or the query isn't a utility command (and
+ * hence could require redoing parse analysis and planning). We keep the
+ * snapshot active till we're done, so that plancache.c doesn't have to
+ * take new ones.
+ */
+ if (n_params > 0 ||
+ (psrc->raw_parse_tree &&
+ analyze_requires_snapshot(psrc->raw_parse_tree)))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ } else {
+ snapshot_set = false;
+ }
+
+ /*
+ * Fetch parameters, if any, and store in the portal's memory context.
+ */
+ if (n_params > 0)
+ {
+ params = (ParamListInfo) palloc(offsetof(ParamListInfoData, params) +
+ n_params * sizeof(ParamExternData));
+ params->paramFetch = NULL;
+ params->paramFetchArg = NULL;
+ params->parserSetup = NULL;
+ params->parserSetupArg = NULL;
+ params->numParams = n_params;
+ params->paramMask = NULL;
+
+ for (paramno = 0; paramno < n_params; paramno++)
+ {
+ Oid ptype = psrc->param_types[paramno];
+ Oid typinput;
+ Oid typioparam;
+
+ getTypeInputInfo(ptype, &typinput, &typioparam);
+
+ params->params[paramno].value = OidInputFunctionCall(typinput, query_params, typioparam, -1);
+ params->params[paramno].isnull = false;
+
+ /*
+ * We mark the params as CONST. This ensures that any custom plan
+ * makes full use of the parameter values.
+ */
+ params->params[paramno].pflags = PARAM_FLAG_CONST;
+ params->params[paramno].ptype = ptype;
+
+ query_params += strlen(query_params) + 1;
+ }
+ } else {
+ params = NULL;
+ }
+
+ /* Done storing stuff in portal's context */
+ MemoryContextSwitchTo(old_context);
+
+ /*
+ * Obtain a plan from the CachedPlanSource. Any cruft from (re)planning
+ * will be generated in MessageContext. The plan refcount will be
+ * assigned to the Portal, so it will be released at portal destruction.
+ */
+ cplan = GetCachedPlan(psrc, params, false);
+
+ /*
+ * Now we can define the portal.
+ *
+ * DO NOT put any code that could possibly throw an error between the
+ * above GetCachedPlan call and here.
+ */
+ PortalDefineQuery(portal,
+ NULL,
+ gen_query,
+ psrc->commandTag,
+ cplan->stmt_list,
+ cplan);
+
+ /* Done with the snapshot used for parameter I/O and parsing/planning */
+ if (snapshot_set) {
+ PopActiveSnapshot();
+ }
+
+ /*
+ * And we're ready to start portal execution.
+ */
+ PortalStart(portal, params, 0, InvalidSnapshot);
+
+ /*
+ * Apply the result format requests to the portal.
+ */
+ PortalSetResultFormat(portal, 1, &entry->format);
+
+ /* Does the portal contain a transaction command? */
+ is_xact_command = IsTransactionStmtList(portal->stmts);
+
+ /*
+ * We must copy the sourceText into MessageContext in
+ * case the portal is destroyed during finish_xact_command. Can avoid the
+ * copy if it's not an xact command, though.
+ */
+ if (is_xact_command)
+ {
+ source_text = pstrdup(portal->sourceText);
+ /*
+ * An xact command shouldn't have any parameters, which is a good
+ * thing because they wouldn't be around after finish_xact_command.
+ */
+ portal_params = NULL;
+ }
+ else
+ {
+ source_text = portal->sourceText;
+ portal_params = portal->portalParams;
+ }
+
+ /*
+ * Report query to various monitoring facilities.
+ */
+ debug_query_string = source_text;
+
+ pgstat_report_activity(STATE_RUNNING, source_text);
+
+ set_ps_display(portal->commandTag, false);
+
+ if (save_log_statement_stats) {
+ ResetUsage();
+ }
+
+ BeginCommand(portal->commandTag, dest);
+
+ PortalSetResultFormat(portal, 1, &entry->format);
+
+
+ /*
+ * Create dest receiver in MessageContext (we don't want it in transaction
+ * context, because that may get deleted if portal contains VACUUM).
+ */
+ receiver = CreateDestReceiver(dest);
+ SetRemoteDestReceiverParams(receiver, portal);
+
+ /*
+ * If we re-issue an Execute protocol request against an existing portal,
+ * then we are only fetching more rows rather than completely re-executing
+ * the query from the start. atStart is never reset for a v3 portal, so we
+ * are safe to use this check.
+ */
+ execute_is_fetch = !portal->atStart;
+
+ /* Log immediately if dictated by log_statement */
+ if (check_log_statement(portal->stmts))
+ {
+ ereport(LOG,
+ (errmsg("%s %s%s%s: %s",
+ execute_is_fetch ?
+ _("execute fetch from") :
+ _("execute"),
+ "<unnamed>",
+ "",
+ "",
+ source_text),
+ errhidestmt(true),
+ errdetail_params(portal_params)));
+ was_logged = true;
+ }
+
+ /* Check for cancel signal before we start execution */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Run the portal to completion, and then drop it (and the receiver).
+ */
+ (void) PortalRun(portal,
+ FETCH_ALL,
+ true,
+ receiver,
+ receiver,
+ completion_tag);
+
+ (*receiver->rDestroy) (receiver);
+
+ PortalDrop(portal, false);
+
+ /*
+ * Tell client that we're done with this query. Note we emit exactly
+ * one EndCommand report for each raw parsetree, thus one for each SQL
+ * command the client sent, regardless of rewriting. (But a command
+ * aborted by error will not send an EndCommand report at all.)
+ */
+ EndCommand(completion_tag, dest);
+
+ /*
+ * Close down transaction statement, if one is open.
+ */
+ finish_xact_command();
+
+ /*
+ * Emit duration logging if appropriate.
+ */
+ switch (check_log_duration(msec_str, was_logged))
+ {
+ case 1:
+ ereport(LOG,
+ (errmsg("duration: %s ms", msec_str),
+ errhidestmt(true)));
+ break;
+ case 2:
+ ereport(LOG,
+ (errmsg("duration: %s ms %s %s%s%s: %s",
+ msec_str,
+ execute_is_fetch ?
+ _("execute fetch from") :
+ _("execute"),
+ "<unnamed>",
+ "",
+ "",
+ source_text),
+ errhidestmt(true),
+ errdetail_params(portal_params)));
+ break;
+ }
+
+ if (save_log_statement_stats) {
+ ShowUsage("EXECUTE MESSAGE STATISTICS");
+ }
+ debug_query_string = NULL;
+
+ return true;
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4f1891f..d3acf5b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -450,6 +450,9 @@ int tcp_keepalives_idle;
int tcp_keepalives_interval;
int tcp_keepalives_count;
+
+int autoprepare_threshold;
+
/*
* SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it
* being set to zero (meaning never renegotiate) for backward compatibility.
@@ -1949,6 +1952,19 @@ static struct config_int ConfigureNamesInt[] =
check_max_stack_depth, assign_max_stack_depth, NULL
},
+ /*
+ * Threshold for implicit preparing of frequently executed queries
+ */
+ {
+ {"autoprepare_threshold", PGC_USERSET, QUERY_TUNING_OTHER,
+ gettext_noop("Threshold for autopreparing query."),
+ NULL,
+ },
+ &autoprepare_threshold,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
{
{"temp_file_limit", PGC_SUSET, RESOURCES_DISK,
gettext_noop("Limits the total size of all temporary files used by each process."),
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 0bf9f21..ef23569 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -252,6 +252,8 @@ extern int client_min_messages;
extern int log_min_duration_statement;
extern int log_temp_files;
+extern int autoprepare_threshold;
+
extern int temp_file_limit;
extern int num_temp_buffers;
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers