On 24.04.2017 21:43, Andres Freund wrote:
Hi,

On 2017-04-24 11:46:02 +0300, Konstantin Knizhnik wrote:
So what I am thinking now is implicit query caching. If the same query with
different literal values is repeated many times, then we can try to
generalize this query and replace it with prepared query with
parameters.
That's not actuall all that easy:
- You pretty much do parse analysis to be able to do an accurate match.
   How much overhead is parse analysis vs. planning in your cases?
- The invalidation infrastructure for this, if not tied to to fully
   parse-analyzed statements, is going to be hell.
- Migrating to parameters can actually cause significant slowdowns, not
   nice if that happens implicitly.

Well, first of all I want to share results I already get: pgbench with default parameters, scale 10 and one connection:

protocol
        TPS
simple
        3492
extended
        2927
prepared
        6865
simple + autoprepare
        6844


So autoprepare is as efficient as explicit prepare and can increase performance almost two times.

My current implementation is replacing with parameters only string literals in the query, i.e. select * from T where x='123'; -> select * from T where x=$1; It greatly simplifies matching of parameters - it is just necessary to locate '\'' character and then correctly handle pairs of quotes.
Handling of integer and real literals is really challenged task.
One source of problems is negation: it is not so easy to correctly understand whether minus should be treated as part of literal or as operator:
(-1), (1-1), (1-1)-1
Another problem is caused by using integer literals in context where parameters can not be used, for example "order by 1".

Fully correct substitution can be done by first performing parsing the query, then transform parse tree, replacing literal nodes with parameter nodes and finally deparse tree into generalized query. postgres_fdw already contains such deparse code. It can be moved to postgres core and reused for autoprepare (and may be somewhere else).
But in this case overhead will be much higher.
I still think that query parsing time is significantly smaller than time needed for building and optimizing query execution plan.
But it should be measured if community will be interested in such approach.

There is obvious question: how I managed to get this pgbench results if currently only substitution of string literals is supported and queries constructed by pgbench don't contain string literals? I just made small patch in pgbench replaceVariable method wrapping value's representation in quotes. It has almost no impact on performance (3482 TPS vs. 3492 TPS),
but allows autoprepare to deal with pgbench queries.

I attached my patch to this mail. It is just first version of the patch (based on REL9_6_STABLE branch) just to illustrate proposed approach. I will be glad to receive any comments and if such optimization is considered to be useful, I will continue work on this patch.

--

Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index f6be98b..6291d66 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -96,8 +96,6 @@ int			max_stack_depth = 100;
 /* wait N seconds to allow attach from a debugger */
 int			PostAuthDelay = 0;
 
-
-
 /* ----------------
  *		private variables
  * ----------------
@@ -188,6 +186,7 @@ static bool IsTransactionStmtList(List *parseTrees);
 static void drop_unnamed_stmt(void);
 static void SigHupHandler(SIGNAL_ARGS);
 static void log_disconnections(int code, Datum arg);
+static bool exec_cached_query(const char *query_string);
 
 
 /* ----------------------------------------------------------------
@@ -916,6 +915,14 @@ exec_simple_query(const char *query_string)
 	drop_unnamed_stmt();
 
 	/*
+	 * Try to find cached plan
+	 */
+	if (autoprepare_threshold != 0 && exec_cached_query(query_string))
+	{
+		return;
+	}
+
+	/*
 	 * Switch to appropriate context for constructing parsetrees.
 	 */
 	oldcontext = MemoryContextSwitchTo(MessageContext);
@@ -4500,3 +4509,566 @@ log_disconnections(int code, Datum arg)
 					port->user_name, port->database_name, port->remote_host,
 				  port->remote_port[0] ? " port=" : "", port->remote_port)));
 }
+
+
+
+typedef struct { 
+	char const* query;
+	int64 exec_count;
+	CachedPlanSource* plan;	
+	int n_params;
+	int16 format;
+} plan_cache_entry;
+
+
+/*
+ * Replace string literals with parameters. We do not consider integer or real literals to avoid problems with 
+ * negative number, user defined operators, ... For example it is not easy to distinguish cases (-1), (1-1), (1-1)-1
+ */
+static void generalize_statement(const char *query_string, char** gen_query, char** query_params, int* n_params)
+{
+	size_t query_len = strlen(query_string);
+	char const* src = query_string;
+	char* dst;
+	char* params;
+	unsigned char ch;
+
+	*n_params = 0;
+
+	*gen_query = (char*)palloc(query_len*2); /* assume that we have less than 1000 parameters, the worst case is replacing '' with $999 */
+	*query_params = (char*)palloc(query_len + 1);
+	dst = *gen_query;
+	params = *query_params;
+
+	while ((ch = *src++) != '\0') { 
+		if (isspace(ch)) { 
+			/* Replace sequence of whitespaces with just one space */
+			while (*src && isspace(*(unsigned char*)src)) { 
+				src += 1;
+			}
+			*dst++ = ' ';
+		} else if (ch == '\'') { 
+			while (true) { 
+				ch = *src++;				
+				if (ch == '\'') { 
+					if (*src != '\'') { 
+						break;
+					} else {
+						/* escaped quote */
+						*params++ = '\'';
+						src += 1;
+					}
+				} else { 
+					*params++ = ch;
+				}
+			}
+			*params++ = '\0';
+			dst += sprintf(dst, "$%d", ++*n_params);
+		} else { 
+			*dst++ = ch;
+		}
+	}			
+	Assert(dst <= *gen_query + query_len);
+	Assert(params <= *query_params + query_len*2);
+	*dst = '\0';
+}
+
+static uint32 plan_cache_hash_fn(const void *key, Size keysize)
+{
+	return string_hash(((plan_cache_entry*)key)->query, 0);
+}
+
+static int plan_cache_match_fn(const void *key1, const void *key2, Size keysize)
+{
+	return strcmp(((plan_cache_entry*)key1)->query, ((plan_cache_entry*)key2)->query);
+}
+
+static void* plan_cache_keycopy_fn(void *dest, const void *src, Size keysize)
+{ 
+	((plan_cache_entry*)dest)->query = pstrdup(((plan_cache_entry*)src)->query);
+    return dest;
+}
+
+#define PLAN_CACHE_SIZE 113
+
+/*
+ * Try to generalize query, find cached plan for it and execute
+ */
+static bool exec_cached_query(const char *query_string)
+{
+	CommandDest       dest = whereToSendOutput;
+	DestReceiver     *receiver;
+	char             *gen_query;
+	char             *query_params;
+	int               n_params;
+	plan_cache_entry *entry;
+	bool              found;
+	MemoryContext     old_context;
+	CachedPlanSource *psrc;
+	ParamListInfo     params;
+	int               paramno;
+	CachedPlan       *cplan;
+	Portal		      portal;
+	bool		      was_logged = false;
+	bool		      is_xact_command;
+	bool		      execute_is_fetch;
+	char		      completion_tag[COMPLETION_TAG_BUFSIZE];
+	bool	 	      save_log_statement_stats = log_statement_stats;
+	ParamListInfo     portal_params;
+	const char       *source_text;
+	char		      msec_str[32];
+	bool		      snapshot_set = false;
+		
+	static HTAB*      plan_cache;
+	static MemoryContext plan_cache_context;
+
+	/* 
+	 * Extract literals from query
+	 */
+	generalize_statement(query_string, &gen_query, &query_params, &n_params);
+
+	if (plan_cache_context == NULL) { 
+		plan_cache_context = AllocSetContextCreate(TopMemoryContext,
+												   "plan cache context",
+												   ALLOCSET_DEFAULT_SIZES);
+	}
+	old_context = MemoryContextSwitchTo(plan_cache_context);
+
+	/* 
+	 * Initialize hash table if not initialized yet
+	 */
+	if (plan_cache == NULL) 
+	{ 
+		static HASHCTL info;
+		info.keysize = sizeof(char*);
+		info.entrysize = sizeof(plan_cache_entry);
+		info.hash = plan_cache_hash_fn;
+		info.match = plan_cache_match_fn;
+		info.keycopy = plan_cache_keycopy_fn;	
+		plan_cache = hash_create("plan_cache", PLAN_CACHE_SIZE, &info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
+	}
+	
+	/*
+	 * Lookup generalized query 
+	 */
+	entry = (plan_cache_entry*)hash_search(plan_cache, &gen_query, HASH_ENTER, &found);
+	if (!found) { 
+		entry->exec_count = 0;
+		entry->plan = NULL;
+		entry->n_params = n_params;
+	} else {
+		Assert(entry->n_params == n_params);
+	}
+	MemoryContextSwitchTo(old_context);
+
+	/*
+	 * Prepare query only when it is executed more than autoprepare_threshold times
+	 */
+	if (entry->exec_count++ < autoprepare_threshold) { 
+		return false;
+	}
+	if (entry->plan == NULL) { 
+		List       *parsetree_list;
+		Node       *raw_parse_tree;
+		const char *command_tag;
+		Query	   *query;
+		List	   *querytree_list;
+		Oid        *param_types = NULL;
+		int         num_params = 0;
+
+		old_context = MemoryContextSwitchTo(MessageContext);
+		
+		parsetree_list = pg_parse_query(gen_query);
+		
+		/*
+		 * Only single user statement are allowed in a prepared statement.
+		 */
+		if (list_length(parsetree_list) != 1) {
+			MemoryContextSwitchTo(old_context);
+			return false;
+		}
+
+		raw_parse_tree = (Node *) linitial(parsetree_list);
+	   
+		/*
+		 * Get the command name for possible use in status display.
+		 */
+		command_tag = CreateCommandTag(raw_parse_tree);
+
+		/*
+		 * If we are in an aborted transaction, reject all commands except
+		 * COMMIT/ROLLBACK.  It is important that this test occur before we
+		 * try to do parse analysis, rewrite, or planning, since all those
+		 * phases try to do database accesses, which may fail in abort state.
+		 * (It might be safe to allow some additional utility commands in this
+		 * state, but not many...)
+		 */
+		if (IsAbortedTransactionBlockState() &&
+			!IsTransactionExitStmt(raw_parse_tree))
+			ereport(ERROR,
+					(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+					 errmsg("current transaction is aborted, "
+						  "commands ignored until end of transaction block"),
+					 errdetail_abort()));
+
+		/*
+		 * Create the CachedPlanSource before we do parse analysis, since it
+		 * needs to see the unmodified raw parse tree.
+		 */
+		psrc = CreateCachedPlan(raw_parse_tree, gen_query, command_tag);
+
+		/*
+		 * Set up a snapshot if parse analysis will need one.
+		 */
+		if (analyze_requires_snapshot(raw_parse_tree))
+		{
+			PushActiveSnapshot(GetTransactionSnapshot());
+			snapshot_set = true;
+		}
+
+		/*
+		 * Analyze and rewrite the query.  Note that the originally specified
+		 * parameter set is not required to be complete, so we have to use
+		 * parse_analyze_varparams().
+		 */
+		if (log_parser_stats) {
+			ResetUsage();
+		}
+
+		query = parse_analyze_varparams(raw_parse_tree,
+										gen_query,
+										&param_types,
+										&num_params);
+		Assert(num_params == entry->n_params);
+
+		/*
+		 * Check all parameter types got determined.
+		 */
+		for (paramno = 0; paramno < n_params; paramno++)
+		{
+			Oid			ptype = param_types[paramno];
+
+			if (ptype == InvalidOid || ptype == UNKNOWNOID) {
+				/* Type of parameter can not be determined */
+				MemoryContextSwitchTo(old_context);
+				return false;
+			}
+		}
+		
+		if (log_parser_stats) {
+			ShowUsage("PARSE ANALYSIS STATISTICS");
+		}
+
+		querytree_list = pg_rewrite_query(query);
+
+		/* Done with the snapshot used for parsing */
+		if (snapshot_set) {
+			PopActiveSnapshot();
+		}
+
+		CompleteCachedPlan(psrc,
+						   querytree_list,
+						   NULL,
+						   param_types,
+						   n_params,
+						   NULL,
+						   NULL,
+						   CURSOR_OPT_PARALLEL_OK,	/* allow parallel mode */
+						   true);	/* fixed result */
+
+		/* If we got a cancel signal during analysis, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		entry->format = 0;				/* TEXT is default */
+		if (IsA(raw_parse_tree, FetchStmt))
+		{
+			FetchStmt  *stmt = (FetchStmt *)raw_parse_tree;
+
+			if (!stmt->ismove)
+			{
+				Portal		fportal = GetPortalByName(stmt->portalname);
+
+				if (PortalIsValid(fportal) &&
+					(fportal->cursorOptions & CURSOR_OPT_BINARY))
+					entry->format = 1; /* BINARY */
+			}
+		}
+
+		SaveCachedPlan(psrc);
+		entry->plan = psrc;
+		MemoryContextSwitchTo(old_context);
+
+		/*
+		 * We do NOT close the open transaction command here; that only happens
+		 * when the client sends Sync.  Instead, do CommandCounterIncrement just
+		 * in case something happened during parse/plan.
+		 */
+		CommandCounterIncrement();
+	}
+	psrc = entry->plan;
+
+	/*
+	 * If we are in aborted transaction state, the only portals we can
+	 * actually run are those containing COMMIT or ROLLBACK commands. We
+	 * disallow binding anything else to avoid problems with infrastructure
+	 * that expects to run inside a valid transaction.  We also disallow
+	 * binding any parameters, since we can't risk calling user-defined I/O
+	 * functions.
+	 */
+	if (IsAbortedTransactionBlockState() &&
+		(!IsTransactionExitStmt(psrc->raw_parse_tree) ||
+		 n_params != 0))
+		ereport(ERROR,
+				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+				 errmsg("current transaction is aborted, "
+						"commands ignored until end of transaction block"),
+				 errdetail_abort()));
+
+	/*
+	 * Create unnamed portal to run the query or queries in. If there
+	 * already is one, silently drop it.
+	 */
+	portal = CreatePortal("", true, true);
+	/* Don't display the portal in pg_cursors */
+	portal->visible = false;
+
+	/*
+	 * Prepare to copy stuff into the portal's memory context.  We do all this
+	 * copying first, because it could possibly fail (out-of-memory) and we
+	 * don't want a failure to occur between GetCachedPlan and
+	 * PortalDefineQuery; that would result in leaking our plancache refcount.
+	 */
+	old_context = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+		
+	/* Copy the plan's query string into the portal */
+	query_string = pstrdup(psrc->query_string);
+
+	/*
+	 * Set a snapshot if we have parameters to fetch (since the input
+	 * functions might need it) or the query isn't a utility command (and
+	 * hence could require redoing parse analysis and planning).  We keep the
+	 * snapshot active till we're done, so that plancache.c doesn't have to
+	 * take new ones.
+	 */
+	if (n_params > 0 ||
+		(psrc->raw_parse_tree &&
+		 analyze_requires_snapshot(psrc->raw_parse_tree)))
+	{
+		PushActiveSnapshot(GetTransactionSnapshot());
+		snapshot_set = true;
+	} else { 
+		snapshot_set = false;
+	}
+
+	/*
+	 * Fetch parameters, if any, and store in the portal's memory context.
+	 */
+	if (n_params > 0)
+	{
+		params = (ParamListInfo) palloc(offsetof(ParamListInfoData, params) +
+										n_params * sizeof(ParamExternData));
+		params->paramFetch = NULL;
+		params->paramFetchArg = NULL;
+		params->parserSetup = NULL;
+		params->parserSetupArg = NULL;
+		params->numParams = n_params;
+		params->paramMask = NULL;
+		
+		for (paramno = 0; paramno < n_params; paramno++)
+		{
+			Oid			ptype = psrc->param_types[paramno];
+			Oid			typinput;
+			Oid			typioparam;
+			
+			getTypeInputInfo(ptype, &typinput, &typioparam);
+			
+			params->params[paramno].value = OidInputFunctionCall(typinput, query_params, typioparam, -1);
+			params->params[paramno].isnull = false;
+
+			/*
+			 * We mark the params as CONST.  This ensures that any custom plan
+			 * makes full use of the parameter values.
+			 */
+			params->params[paramno].pflags = PARAM_FLAG_CONST;
+			params->params[paramno].ptype = ptype;
+
+			query_params += strlen(query_params) + 1;
+		}
+	} else { 
+		params = NULL;
+	}
+		
+	/* Done storing stuff in portal's context */
+	MemoryContextSwitchTo(old_context);
+
+	/*
+	 * Obtain a plan from the CachedPlanSource.  Any cruft from (re)planning
+	 * will be generated in MessageContext.  The plan refcount will be
+	 * assigned to the Portal, so it will be released at portal destruction.
+	 */
+	cplan = GetCachedPlan(psrc, params, false);
+	
+	/*
+	 * Now we can define the portal.
+	 *
+	 * DO NOT put any code that could possibly throw an error between the
+	 * above GetCachedPlan call and here.
+	 */
+	PortalDefineQuery(portal,
+					  NULL,
+					  gen_query,
+					  psrc->commandTag,
+					  cplan->stmt_list,
+					  cplan);
+
+	/* Done with the snapshot used for parameter I/O and parsing/planning */
+	if (snapshot_set) {
+		PopActiveSnapshot();
+	}
+
+	/*
+	 * And we're ready to start portal execution.
+	 */
+	PortalStart(portal, params, 0, InvalidSnapshot);
+
+	/*
+	 * Apply the result format requests to the portal.
+	 */
+	PortalSetResultFormat(portal, 1, &entry->format);
+
+	/* Does the portal contain a transaction command? */
+	is_xact_command = IsTransactionStmtList(portal->stmts);
+
+	/*
+	 * We must copy the sourceText into MessageContext in
+	 * case the portal is destroyed during finish_xact_command. Can avoid the
+	 * copy if it's not an xact command, though.
+	 */
+	if (is_xact_command)
+	{
+		source_text = pstrdup(portal->sourceText);
+		/*
+		 * An xact command shouldn't have any parameters, which is a good
+		 * thing because they wouldn't be around after finish_xact_command.
+		 */
+		portal_params = NULL;
+	}
+	else
+	{
+		source_text = portal->sourceText;
+		portal_params = portal->portalParams;
+	}
+
+	/*
+	 * Report query to various monitoring facilities.
+	 */
+	debug_query_string = source_text;
+
+	pgstat_report_activity(STATE_RUNNING, source_text);
+
+	set_ps_display(portal->commandTag, false);
+
+	if (save_log_statement_stats) {
+		ResetUsage();
+	}
+
+	BeginCommand(portal->commandTag, dest);
+
+	PortalSetResultFormat(portal, 1, &entry->format);
+
+
+	/*
+	 * Create dest receiver in MessageContext (we don't want it in transaction
+	 * context, because that may get deleted if portal contains VACUUM).
+	 */
+	receiver = CreateDestReceiver(dest);
+	SetRemoteDestReceiverParams(receiver, portal);
+
+	/*
+	 * If we re-issue an Execute protocol request against an existing portal,
+	 * then we are only fetching more rows rather than completely re-executing
+	 * the query from the start. atStart is never reset for a v3 portal, so we
+	 * are safe to use this check.
+	 */
+	execute_is_fetch = !portal->atStart;
+
+	/* Log immediately if dictated by log_statement */
+	if (check_log_statement(portal->stmts))
+	{
+		ereport(LOG,
+				(errmsg("%s %s%s%s: %s",
+						execute_is_fetch ?
+						_("execute fetch from") :
+						_("execute"),
+						"<unnamed>",
+						"",
+						"",
+						source_text),
+				 errhidestmt(true),
+				 errdetail_params(portal_params)));
+		was_logged = true;
+	}
+
+	/* Check for cancel signal before we start execution */
+	CHECK_FOR_INTERRUPTS();
+
+	/*
+	 * Run the portal to completion, and then drop it (and the receiver).
+	 */
+	(void) PortalRun(portal,
+					 FETCH_ALL,
+					 true,
+					 receiver,
+					 receiver,
+					 completion_tag);
+
+	(*receiver->rDestroy) (receiver);
+
+	PortalDrop(portal, false);
+
+	/*
+	 * Tell client that we're done with this query.  Note we emit exactly
+	 * one EndCommand report for each raw parsetree, thus one for each SQL
+	 * command the client sent, regardless of rewriting. (But a command
+	 * aborted by error will not send an EndCommand report at all.)
+	 */
+	EndCommand(completion_tag, dest);
+
+	/*
+	 * Close down transaction statement, if one is open.
+	 */
+	finish_xact_command();
+
+	/*
+	 * Emit duration logging if appropriate.
+	 */
+	switch (check_log_duration(msec_str, was_logged))
+	{
+		case 1:
+			ereport(LOG,
+					(errmsg("duration: %s ms", msec_str),
+					 errhidestmt(true)));
+			break;
+		case 2:
+			ereport(LOG,
+					(errmsg("duration: %s ms  %s %s%s%s: %s",
+							msec_str,
+							execute_is_fetch ?
+							_("execute fetch from") :
+							_("execute"),
+							"<unnamed>",
+							"",
+							"",
+							source_text),
+					 errhidestmt(true),
+					 errdetail_params(portal_params)));
+			break;
+	}
+
+	if (save_log_statement_stats) {
+		ShowUsage("EXECUTE MESSAGE STATISTICS");
+	}
+	debug_query_string = NULL;
+
+	return true;
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4f1891f..d3acf5b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -450,6 +450,9 @@ int			tcp_keepalives_idle;
 int			tcp_keepalives_interval;
 int			tcp_keepalives_count;
 
+
+int         autoprepare_threshold;
+
 /*
  * SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it
  * being set to zero (meaning never renegotiate) for backward compatibility.
@@ -1949,6 +1952,19 @@ static struct config_int ConfigureNamesInt[] =
 		check_max_stack_depth, assign_max_stack_depth, NULL
 	},
 
+	/*
+	 * Threshold for implicit preparing of frequently executed queries
+	 */
+	{
+		{"autoprepare_threshold", PGC_USERSET, QUERY_TUNING_OTHER,
+		 gettext_noop("Threshold for autopreparing query."),
+		 NULL,
+		},
+		&autoprepare_threshold,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"temp_file_limit", PGC_SUSET, RESOURCES_DISK,
 			gettext_noop("Limits the total size of all temporary files used by each process."),
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 0bf9f21..ef23569 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -252,6 +252,8 @@ extern int	client_min_messages;
 extern int	log_min_duration_statement;
 extern int	log_temp_files;
 
+extern int  autoprepare_threshold;
+
 extern int	temp_file_limit;
 
 extern int	num_temp_buffers;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to