On 26.04.2017 10:49, Konstantin Knizhnik wrote:


On 26.04.2017 04:00, Tsunakawa, Takayuki wrote: Are you considering some upper limit on the number of prepared statements? In this case we need some kind of LRU for maintaining cache of autoprepared statements. I think that it is good idea to have such limited cached - it can avoid memory overflow problem.
I will try to implement it.

I attach new patch which allows to limit the number of autoprepared statements (autoprepare_limit GUC variable). Also I did more measurements, now with several concurrent connections and read-only statements. Results of pgbench with 10 connections, scale 10 and read-only statements are below:

Protocol
        TPS
extended
        87k
prepared
        209k
simple+autoprepare
        206k


As you can see, autoprepare provides more than 2 times speed improvement.

Also I tried to measure overhead of parsing (to be able to substitute all literals, not only string literals).
I just added extra call of pg_parse_query. Speed is reduced to 181k.
So overhead is noticeable, but still making such optimization useful.
This is why I want to ask question: is it better to implement slower but safer and more universal solution?

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index f6be98b..0c9abfc 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -188,6 +188,7 @@ static bool IsTransactionStmtList(List *parseTrees);
 static void drop_unnamed_stmt(void);
 static void SigHupHandler(SIGNAL_ARGS);
 static void log_disconnections(int code, Datum arg);
+static bool exec_cached_query(const char *query_string);
 
 
 /* ----------------------------------------------------------------
@@ -916,6 +917,14 @@ exec_simple_query(const char *query_string)
 	drop_unnamed_stmt();
 
 	/*
+	 * Try to find cached plan
+	 */
+	if (autoprepare_threshold != 0 && exec_cached_query(query_string))
+	{
+		return;
+	}
+
+	/*
 	 * Switch to appropriate context for constructing parsetrees.
 	 */
 	oldcontext = MemoryContextSwitchTo(MessageContext);
@@ -4500,3 +4509,606 @@ log_disconnections(int code, Datum arg)
 					port->user_name, port->database_name, port->remote_host,
 				  port->remote_port[0] ? " port=" : "", port->remote_port)));
 }
+
+typedef struct { 
+	char const*       query;
+	dlist_node        lru;
+	int64             exec_count;
+	CachedPlanSource* plan;	
+	int               n_params;
+	int16             format;
+	bool              disable_autoprepare;
+} plan_cache_entry;
+
+/*
+ * Replace string literals with parameters. We do not consider integer or real literals to avoid problems with 
+ * negative number, user defined operators, ... For example it is not easy to distinguish cases (-1), (1-1), (1-1)-1
+ */
+static void generalize_statement(const char *query_string, char** gen_query, char** query_params, int* n_params)
+{
+	size_t query_len = strlen(query_string);
+	char const* src = query_string;
+	char* dst;
+	char* params;
+	unsigned char ch;
+
+	*n_params = 0;
+
+	*gen_query = (char*)palloc(query_len*2); /* assume that we have less than 1000 parameters, the worst case is replacing '' with $999 */
+	*query_params = (char*)palloc(query_len + 1);
+	dst = *gen_query;
+	params = *query_params;
+
+	while ((ch = *src++) != '\0') { 
+		if (isspace(ch)) { 
+			/* Replace sequence of whitespaces with just one space */
+			while (*src && isspace(*(unsigned char*)src)) { 
+				src += 1;
+			}
+			*dst++ = ' ';
+		} else if (ch == '\'') { 
+			while (true) { 
+				ch = *src++;				
+				if (ch == '\'') { 
+					if (*src != '\'') { 
+						break;
+					} else {
+						/* escaped quote */
+						*params++ = '\'';
+						src += 1;
+					}
+				} else { 
+					*params++ = ch;
+				}
+			}
+			*params++ = '\0';
+			dst += sprintf(dst, "$%d", ++*n_params);
+		} else { 
+			*dst++ = ch;
+		}
+	}			
+	Assert(dst <= *gen_query + query_len);
+	Assert(params <= *query_params + query_len*2);
+	*dst = '\0';
+}
+
+static uint32 plan_cache_hash_fn(const void *key, Size keysize)
+{
+	return string_hash(((plan_cache_entry*)key)->query, 0);
+}
+
+static int plan_cache_match_fn(const void *key1, const void *key2, Size keysize)
+{
+	return strcmp(((plan_cache_entry*)key1)->query, ((plan_cache_entry*)key2)->query);
+}
+
+static void* plan_cache_keycopy_fn(void *dest, const void *src, Size keysize)
+{ 
+	((plan_cache_entry*)dest)->query = pstrdup(((plan_cache_entry*)src)->query);
+    return dest;
+}
+
+#define PLAN_CACHE_SIZE 113
+
+size_t nPlanCacheHits;
+size_t nPlanCacheMisses;
+
+/*
+ * Try to generalize query, find cached plan for it and execute
+ */
+static bool exec_cached_query(const char *query_string)
+{
+	CommandDest       dest = whereToSendOutput;
+	DestReceiver     *receiver;
+	char             *gen_query;
+	char             *query_params;
+	int               n_params;
+	plan_cache_entry *entry;
+	bool              found;
+	MemoryContext     old_context;
+	CachedPlanSource *psrc;
+	ParamListInfo     params;
+	int               paramno;
+	CachedPlan       *cplan;
+	Portal		      portal;
+	bool		      was_logged = false;
+	bool		      is_xact_command;
+	bool		      execute_is_fetch;
+	char		      completion_tag[COMPLETION_TAG_BUFSIZE];
+	bool	 	      save_log_statement_stats = log_statement_stats;
+	ParamListInfo     portal_params;
+	const char       *source_text;
+	char		      msec_str[32];
+	bool		      snapshot_set = false;
+		
+	static HTAB*      plan_cache;
+	static dlist_head lru;
+	static size_t     n_cached_queries;
+	static MemoryContext plan_cache_context;
+
+	/* 
+	 * Extract literals from query
+	 */
+	generalize_statement(query_string, &gen_query, &query_params, &n_params);
+
+	if (plan_cache_context == NULL) { 
+		plan_cache_context = AllocSetContextCreate(TopMemoryContext,
+												   "plan cache context",
+												   ALLOCSET_DEFAULT_SIZES);
+	}
+	old_context = MemoryContextSwitchTo(plan_cache_context);
+
+	/* 
+	 * Initialize hash table if not initialized yet
+	 */
+	if (plan_cache == NULL) 
+	{ 
+		static HASHCTL info;
+		info.keysize = sizeof(char*);
+		info.entrysize = sizeof(plan_cache_entry);
+		info.hash = plan_cache_hash_fn;
+		info.match = plan_cache_match_fn;
+		info.keycopy = plan_cache_keycopy_fn;	
+		plan_cache = hash_create("plan_cache", autoprepare_limit != 0 ? autoprepare_limit : PLAN_CACHE_SIZE, 
+								 &info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
+		dlist_init(&lru);
+	}
+	
+	/*
+	 * Lookup generalized query 
+	 */
+	entry = (plan_cache_entry*)hash_search(plan_cache, &gen_query, HASH_ENTER, &found);
+	if (!found) { 
+		if (++n_cached_queries > autoprepare_limit && autoprepare_limit != 0) { 
+			plan_cache_entry* victim = dlist_container(plan_cache_entry, lru, lru.head.prev);
+			DropCachedPlan(victim->plan);
+			hash_search(plan_cache, victim, HASH_REMOVE, NULL);
+			n_cached_queries -= 1;
+		}
+		entry->exec_count = 0;
+		entry->plan = NULL;
+		entry->disable_autoprepare = false;
+	} else { 
+		dlist_delete(&entry->lru);
+		if (entry->plan != NULL && !entry->plan->is_valid) { 
+			DropCachedPlan(entry->plan);
+			entry->plan = NULL;
+		}
+	}
+	dlist_insert_after(&lru.head, &entry->lru);
+	MemoryContextSwitchTo(old_context);
+
+	/*
+	 * Prepare query only when it is executed more than autoprepare_threshold times
+	 */
+	if (entry->disable_autoprepare || entry->exec_count++ < autoprepare_threshold) { 
+		nPlanCacheMisses += 1;
+		return false;
+	}
+	if (entry->plan == NULL) { 
+		List       *parsetree_list;
+		Node       *raw_parse_tree;
+		const char *command_tag;
+		Query	   *query;
+		List	   *querytree_list;
+		Oid        *param_types = NULL;
+		int         num_params = 0;
+
+		old_context = MemoryContextSwitchTo(MessageContext);
+		
+		PG_TRY();
+		{    
+			parsetree_list = pg_parse_query(gen_query);
+			query_string = gen_query;
+		}
+		PG_CATCH();
+		{
+			elog(LOG, "Failed to autoprepare query \"%s\"", gen_query);
+			FlushErrorState();
+			MemoryContextSwitchTo(old_context);
+			entry->disable_autoprepare = true;
+			nPlanCacheMisses += 1;
+			return false;
+		}
+		PG_END_TRY();
+		
+		/*
+		 * Only single user statement are allowed in a prepared statement.
+		 */
+		if (list_length(parsetree_list) != 1) {
+			MemoryContextSwitchTo(old_context);
+			entry->disable_autoprepare = true;
+			nPlanCacheMisses += 1;
+			return false;
+		}
+
+		raw_parse_tree = (Node *) linitial(parsetree_list);
+		
+		/*
+		 * Get the command name for possible use in status display.
+		 */
+		command_tag = CreateCommandTag(raw_parse_tree);
+
+		/*
+		 * If we are in an aborted transaction, reject all commands except
+		 * COMMIT/ROLLBACK.  It is important that this test occur before we
+		 * try to do parse analysis, rewrite, or planning, since all those
+		 * phases try to do database accesses, which may fail in abort state.
+		 * (It might be safe to allow some additional utility commands in this
+		 * state, but not many...)
+		 */
+		if (IsAbortedTransactionBlockState() &&
+			!IsTransactionExitStmt(raw_parse_tree))
+			ereport(ERROR,
+					(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+					 errmsg("current transaction is aborted, "
+						  "commands ignored until end of transaction block"),
+					 errdetail_abort()));
+
+		/*
+		 * Create the CachedPlanSource before we do parse analysis, since it
+		 * needs to see the unmodified raw parse tree.
+		 */
+		psrc = CreateCachedPlan(raw_parse_tree, query_string, command_tag);
+
+		/*
+		 * Set up a snapshot if parse analysis will need one.
+		 */
+		if (analyze_requires_snapshot(raw_parse_tree))
+		{
+			PushActiveSnapshot(GetTransactionSnapshot());
+			snapshot_set = true;
+		}
+
+		/*
+		 * Analyze and rewrite the query.  Note that the originally specified
+		 * parameter set is not required to be complete, so we have to use
+		 * parse_analyze_varparams().
+		 */
+		if (log_parser_stats) {
+			ResetUsage();
+		}
+
+		query = parse_analyze_varparams(raw_parse_tree,
+										query_string,
+										&param_types,
+										&num_params);
+		Assert(num_params == n_params);
+
+		/*
+		 * Check all parameter types got determined.
+		 */
+		for (paramno = 0; paramno < n_params; paramno++)
+		{
+			Oid			ptype = param_types[paramno];
+
+			if (ptype == InvalidOid || ptype == UNKNOWNOID) {
+				/* Type of parameter can not be determined */
+				MemoryContextSwitchTo(old_context);
+				entry->disable_autoprepare = true;
+				nPlanCacheMisses += 1;
+				return false;
+			}
+		}
+		
+		if (log_parser_stats) {
+			ShowUsage("PARSE ANALYSIS STATISTICS");
+		}
+
+		querytree_list = pg_rewrite_query(query);
+
+		/* Done with the snapshot used for parsing */
+		if (snapshot_set) {
+			PopActiveSnapshot();
+		}
+
+		CompleteCachedPlan(psrc,
+						   querytree_list,
+						   NULL,
+						   param_types,
+						   n_params,
+						   NULL,
+						   NULL,
+						   CURSOR_OPT_PARALLEL_OK,	/* allow parallel mode */
+						   true);	/* fixed result */
+
+		/* If we got a cancel signal during analysis, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		entry->format = 0;				/* TEXT is default */
+		if (IsA(raw_parse_tree, FetchStmt))
+		{
+			FetchStmt  *stmt = (FetchStmt *)raw_parse_tree;
+
+			if (!stmt->ismove)
+			{
+				Portal		fportal = GetPortalByName(stmt->portalname);
+
+				if (PortalIsValid(fportal) &&
+					(fportal->cursorOptions & CURSOR_OPT_BINARY))
+					entry->format = 1; /* BINARY */
+			}
+		}
+
+		SaveCachedPlan(psrc);
+		entry->plan = psrc;
+		entry->n_params = n_params;
+		MemoryContextSwitchTo(old_context);
+
+		/*
+		 * We do NOT close the open transaction command here; that only happens
+		 * when the client sends Sync.  Instead, do CommandCounterIncrement just
+		 * in case something happened during parse/plan.
+		 */
+		CommandCounterIncrement();
+	} else { 
+		psrc = entry->plan;
+		n_params = entry->n_params;
+	}
+
+	/*
+	 * If we are in aborted transaction state, the only portals we can
+	 * actually run are those containing COMMIT or ROLLBACK commands. We
+	 * disallow binding anything else to avoid problems with infrastructure
+	 * that expects to run inside a valid transaction.  We also disallow
+	 * binding any parameters, since we can't risk calling user-defined I/O
+	 * functions.
+	 */
+	if (IsAbortedTransactionBlockState() &&
+		(!IsTransactionExitStmt(psrc->raw_parse_tree) ||
+		 n_params != 0))
+		ereport(ERROR,
+				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+				 errmsg("current transaction is aborted, "
+						"commands ignored until end of transaction block"),
+				 errdetail_abort()));
+
+	/*
+	 * Create unnamed portal to run the query or queries in. If there
+	 * already is one, silently drop it.
+	 */
+	portal = CreatePortal("", true, true);
+	/* Don't display the portal in pg_cursors */
+	portal->visible = false;
+
+	/*
+	 * Prepare to copy stuff into the portal's memory context.  We do all this
+	 * copying first, because it could possibly fail (out-of-memory) and we
+	 * don't want a failure to occur between GetCachedPlan and
+	 * PortalDefineQuery; that would result in leaking our plancache refcount.
+	 */
+	old_context = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+		
+	/* Copy the plan's query string into the portal */
+	query_string = pstrdup(psrc->query_string);
+
+	/*
+	 * Set a snapshot if we have parameters to fetch (since the input
+	 * functions might need it) or the query isn't a utility command (and
+	 * hence could require redoing parse analysis and planning).  We keep the
+	 * snapshot active till we're done, so that plancache.c doesn't have to
+	 * take new ones.
+	 */
+	if (n_params > 0 ||
+		(psrc->raw_parse_tree &&
+		 analyze_requires_snapshot(psrc->raw_parse_tree)))
+	{
+		PushActiveSnapshot(GetTransactionSnapshot());
+		snapshot_set = true;
+	} else { 
+		snapshot_set = false;
+	}
+
+	/*
+	 * Fetch parameters, if any, and store in the portal's memory context.
+	 */
+	if (n_params > 0)
+	{
+		params = (ParamListInfo) palloc(offsetof(ParamListInfoData, params) +
+										n_params * sizeof(ParamExternData));
+		params->paramFetch = NULL;
+		params->paramFetchArg = NULL;
+		params->parserSetup = NULL;
+		params->parserSetupArg = NULL;
+		params->numParams = n_params;
+		params->paramMask = NULL;
+		
+		for (paramno = 0; paramno < n_params; paramno++)
+		{
+			Oid			ptype = psrc->param_types[paramno];
+			Oid			typinput;
+			Oid			typioparam;
+			
+			getTypeInputInfo(ptype, &typinput, &typioparam);
+			
+			params->params[paramno].value = OidInputFunctionCall(typinput, query_params, typioparam, -1);
+			params->params[paramno].isnull = false;
+
+			/*
+			 * We mark the params as CONST.  This ensures that any custom plan
+			 * makes full use of the parameter values.
+			 */
+			params->params[paramno].pflags = PARAM_FLAG_CONST;
+			params->params[paramno].ptype = ptype;
+
+			query_params += strlen(query_params) + 1;
+		}
+	} else { 
+		params = NULL;
+	}
+		
+	/* Done storing stuff in portal's context */
+	MemoryContextSwitchTo(old_context);
+
+	/*
+	 * Obtain a plan from the CachedPlanSource.  Any cruft from (re)planning
+	 * will be generated in MessageContext.  The plan refcount will be
+	 * assigned to the Portal, so it will be released at portal destruction.
+	 */
+	cplan = GetCachedPlan(psrc, params, false);
+	
+	/*
+	 * Now we can define the portal.
+	 *
+	 * DO NOT put any code that could possibly throw an error between the
+	 * above GetCachedPlan call and here.
+	 */
+	PortalDefineQuery(portal,
+					  NULL,
+					  query_string,
+					  psrc->commandTag,
+					  cplan->stmt_list,
+					  cplan);
+
+	/* Done with the snapshot used for parameter I/O and parsing/planning */
+	if (snapshot_set) {
+		PopActiveSnapshot();
+	}
+
+	/*
+	 * And we're ready to start portal execution.
+	 */
+	PortalStart(portal, params, 0, InvalidSnapshot);
+
+	/*
+	 * Apply the result format requests to the portal.
+	 */
+	PortalSetResultFormat(portal, 1, &entry->format);
+
+	/* Does the portal contain a transaction command? */
+	is_xact_command = IsTransactionStmtList(portal->stmts);
+
+	/*
+	 * We must copy the sourceText into MessageContext in
+	 * case the portal is destroyed during finish_xact_command. Can avoid the
+	 * copy if it's not an xact command, though.
+	 */
+	if (is_xact_command)
+	{
+		source_text = pstrdup(portal->sourceText);
+		/*
+		 * An xact command shouldn't have any parameters, which is a good
+		 * thing because they wouldn't be around after finish_xact_command.
+		 */
+		portal_params = NULL;
+	}
+	else
+	{
+		source_text = portal->sourceText;
+		portal_params = portal->portalParams;
+	}
+
+	/*
+	 * Report query to various monitoring facilities.
+	 */
+	debug_query_string = source_text;
+
+	pgstat_report_activity(STATE_RUNNING, source_text);
+
+	set_ps_display(portal->commandTag, false);
+
+	if (save_log_statement_stats) {
+		ResetUsage();
+	}
+
+	BeginCommand(portal->commandTag, dest);
+
+	PortalSetResultFormat(portal, 1, &entry->format);
+
+
+	/*
+	 * Create dest receiver in MessageContext (we don't want it in transaction
+	 * context, because that may get deleted if portal contains VACUUM).
+	 */
+	receiver = CreateDestReceiver(dest);
+	SetRemoteDestReceiverParams(receiver, portal);
+
+	/*
+	 * If we re-issue an Execute protocol request against an existing portal,
+	 * then we are only fetching more rows rather than completely re-executing
+	 * the query from the start. atStart is never reset for a v3 portal, so we
+	 * are safe to use this check.
+	 */
+	execute_is_fetch = !portal->atStart;
+
+	/* Log immediately if dictated by log_statement */
+	if (check_log_statement(portal->stmts))
+	{
+		ereport(LOG,
+				(errmsg("%s %s%s%s: %s",
+						execute_is_fetch ?
+						_("execute fetch from") :
+						_("execute"),
+						"<unnamed>",
+						"",
+						"",
+						source_text),
+				 errhidestmt(true),
+				 errdetail_params(portal_params)));
+		was_logged = true;
+	}
+
+	/* Check for cancel signal before we start execution */
+	CHECK_FOR_INTERRUPTS();
+
+	/*
+	 * Run the portal to completion, and then drop it (and the receiver).
+	 */
+	(void) PortalRun(portal,
+					 FETCH_ALL,
+					 true,
+					 receiver,
+					 receiver,
+					 completion_tag);
+
+	(*receiver->rDestroy) (receiver);
+
+	PortalDrop(portal, false);
+
+	/*
+	 * Tell client that we're done with this query.  Note we emit exactly
+	 * one EndCommand report for each raw parsetree, thus one for each SQL
+	 * command the client sent, regardless of rewriting. (But a command
+	 * aborted by error will not send an EndCommand report at all.)
+	 */
+	EndCommand(completion_tag, dest);
+
+	/*
+	 * Close down transaction statement, if one is open.
+	 */
+	finish_xact_command();
+
+	/*
+	 * Emit duration logging if appropriate.
+	 */
+	switch (check_log_duration(msec_str, was_logged))
+	{
+		case 1:
+			ereport(LOG,
+					(errmsg("duration: %s ms", msec_str),
+					 errhidestmt(true)));
+			break;
+		case 2:
+			ereport(LOG,
+					(errmsg("duration: %s ms  %s %s%s%s: %s",
+							msec_str,
+							execute_is_fetch ?
+							_("execute fetch from") :
+							_("execute"),
+							"<unnamed>",
+							"",
+							"",
+							source_text),
+					 errhidestmt(true),
+					 errdetail_params(portal_params)));
+			break;
+	}
+
+	if (save_log_statement_stats) {
+		ShowUsage("EXECUTE MESSAGE STATISTICS");
+	}
+	debug_query_string = NULL;
+	nPlanCacheHits += 1;
+
+	return true;
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4f1891f..4bb5b65 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -450,6 +450,10 @@ int			tcp_keepalives_idle;
 int			tcp_keepalives_interval;
 int			tcp_keepalives_count;
 
+
+int         autoprepare_threshold;
+int         autoprepare_limit;
+
 /*
  * SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it
  * being set to zero (meaning never renegotiate) for backward compatibility.
@@ -1949,6 +1953,28 @@ static struct config_int ConfigureNamesInt[] =
 		check_max_stack_depth, assign_max_stack_depth, NULL
 	},
 
+	/*
+	 * Threshold for implicit preparing of frequently executed queries
+	 */
+	{
+		{"autoprepare_threshold", PGC_USERSET, QUERY_TUNING_OTHER,
+		 gettext_noop("Threshold for autopreparing query."),
+		 NULL,
+		},
+		&autoprepare_threshold,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+	{
+		{"autoprepare_limit", PGC_USERSET, QUERY_TUNING_OTHER,
+		 gettext_noop("Maximal number of autoprepared queries."),
+		 NULL,
+		},
+		&autoprepare_threshold,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"temp_file_limit", PGC_SUSET, RESOURCES_DISK,
 			gettext_noop("Limits the total size of all temporary files used by each process."),
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 0bf9f21..f035ce7 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -252,6 +252,9 @@ extern int	client_min_messages;
 extern int	log_min_duration_statement;
 extern int	log_temp_files;
 
+extern int  autoprepare_threshold;
+extern int  autoprepare_limit;
+
 extern int	temp_file_limit;
 
 extern int	num_temp_buffers;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to