On Fri, Jun 28, 2019 at 4:39 PM Julien Rouhaud <rjuju...@gmail.com> wrote:
>
> On Tue, Mar 19, 2019 at 3:51 PM Julien Rouhaud <rjuju...@gmail.com> wrote:
> >
> > On Mon, Mar 18, 2019 at 7:33 PM Julien Rouhaud <rjuju...@gmail.com> wrote:
> > >
> > > On Mon, Mar 18, 2019 at 6:23 PM Yun Li <liyunjuany...@gmail.com> wrote:
> > > >
> > > > Let's take one step back. Since queryId is stored in core as Julien 
> > > > pointed out, can we just add that global to the pg_stat_get_activity 
> > > > and ultimately exposed in pg_stat_activity view? Then no matter whether 
> > > > PGSS  is on or off, or however the customer extensions are updating 
> > > > that filed, we expose that field in that view then enable user to 
> > > > leverage that id to join with pgss or their extension. Will this sounds 
> > > > a good idea?
> > >
> > > I'd greatly welcome expose queryid exposure in pg_stat_activity, and
> > > also in log_line_prefix.  I'm afraid that it's too late for pg12
> > > inclusion, but I'll be happy to provide a patch for that for pg13.
> >
> > Here's a prototype patch for queryid exposure in pg_stat_activity and
> > log_line prefix.
>
> Patch doesn't apply anymore, PFA rebased v2.

Sorry, I missed the new pg_stat_gssapi view.
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 84341a30e5..d68b492c25 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -6353,6 +6353,11 @@ local0.*    /var/log/postgresql
              session processes</entry>
              <entry>no</entry>
             </row>
+            <row>
+             <entry><literal>%Q</literal></entry>
+             <entry>queryid: identifier of session's current query, if any</entry>
+             <entry>yes</entry>
+            </row>
             <row>
              <entry><literal>%%</literal></entry>
              <entry>Literal <literal>%</literal></entry>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bf72d0c303..d4e3d70933 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -824,6 +824,19 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
      <entry><type>xid</type></entry>
      <entry>The current backend's <literal>xmin</literal> horizon.</entry>
     </row>
+    <row>
+     <entry><structfield>queryid</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Identifier this backend's most recent query. If
+      <structfield>state</structfield> is <literal>active</literal> this field
+      shows the identifier of the currently executing query. In all other
+      states, it shows the identifier of last query that was executed, unless
+      an error occured which will reset this field to 0.  By default, query
+      identifiers are not computed, so this field will always display 0, unless
+      an additional module that compute query identifiers, such as <xref
+      linkend="pgstatstatements"/>, is configured.
+     </entry>
+    </row>
     <row>
      <entry><structfield>query</structfield></entry>
      <entry><type>text</type></entry>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ea4c85e395..f30098c2cd 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -749,6 +749,7 @@ CREATE VIEW pg_stat_activity AS
             S.state,
             S.backend_xid,
             s.backend_xmin,
+            S.queryid,
             S.query,
             S.backend_type
     FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 27f0345515..44c9525a59 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -143,6 +143,8 @@ static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
 void
 ExecutorStart(QueryDesc *queryDesc, int eflags)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorStart_hook)
 		(*ExecutorStart_hook) (queryDesc, eflags);
 	else
@@ -303,6 +305,8 @@ ExecutorRun(QueryDesc *queryDesc,
 			ScanDirection direction, uint64 count,
 			bool execute_once)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorRun_hook)
 		(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
 	else
@@ -402,6 +406,8 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 void
 ExecutorFinish(QueryDesc *queryDesc)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorFinish_hook)
 		(*ExecutorFinish_hook) (queryDesc);
 	else
@@ -462,6 +468,8 @@ standard_ExecutorFinish(QueryDesc *queryDesc)
 void
 ExecutorEnd(QueryDesc *queryDesc)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorEnd_hook)
 		(*ExecutorEnd_hook) (queryDesc);
 	else
@@ -541,6 +549,8 @@ ExecutorRewind(QueryDesc *queryDesc)
 	/* It's probably not sensible to rescan updating queries */
 	Assert(queryDesc->operation == CMD_SELECT);
 
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	/*
 	 * Switch into per-query memory context
 	 */
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 8eedb613a1..1d8c859a88 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -24,6 +24,7 @@
 #include "executor/executor.h"
 #include "executor/spi_priv.h"
 #include "miscadmin.h"
+#include "storage/proc.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -1940,6 +1941,7 @@ _SPI_prepare_plan(const char *src, SPIPlanPtr plan)
 	List	   *plancache_list;
 	ListCell   *list_item;
 	ErrorContextCallback spierrcontext;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 
 	/*
 	 * Setup error traceback support for ereport()
@@ -1996,6 +1998,8 @@ _SPI_prepare_plan(const char *src, SPIPlanPtr plan)
 											   _SPI_current->queryEnv);
 		}
 
+		pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 		/* Finish filling in the CachedPlanSource */
 		CompleteCachedPlan(plansource,
 						   stmt_list,
@@ -2107,6 +2111,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	int			res = 0;
 	bool		pushed_active_snap = false;
 	ErrorContextCallback spierrcontext;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 	CachedPlan *cplan = NULL;
 	ListCell   *lc1;
 
@@ -2196,6 +2201,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 												   _SPI_current->queryEnv);
 			}
 
+			pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 			/* Finish filling in the CachedPlanSource */
 			CompleteCachedPlan(plansource,
 							   stmt_list,
@@ -2366,6 +2373,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				}
 			}
 
+			pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 			/*
 			 * The last canSetTag query sets the status values returned to the
 			 * caller.  Be careful to free any tuptables not returned, to
@@ -2469,6 +2478,7 @@ static int
 _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 {
 	int			operation = queryDesc->operation;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 	int			eflags;
 	int			res;
 
@@ -2533,6 +2543,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 	ExecutorEnd(queryDesc);
 	/* FreeQueryDesc is done by the caller */
 
+	pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 #ifdef SPI_EXECUTOR_STATS
 	if (ShowExecutorStats)
 		ShowUsage("SPI EXECUTOR STATS");
@@ -2580,6 +2592,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 					  DestReceiver *dest)
 {
 	uint64		nfetched;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 
 	/* Check that the portal is valid */
 	if (!PortalIsValid(portal))
@@ -2614,6 +2627,8 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	if (dest->mydest == DestSPI && _SPI_checktuples())
 		elog(ERROR, "consistency check on SPI tuple count failed");
 
+	pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 	/* Put the result into place for access by caller */
 	SPI_processed = _SPI_current->processed;
 	SPI_tuptable = _SPI_current->tuptable;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index b13c246183..cca506674c 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -44,6 +44,7 @@
 #include "parser/parse_target.h"
 #include "parser/parsetree.h"
 #include "rewrite/rewriteManip.h"
+#include "storage/proc.h"
 #include "utils/rel.h"
 
 
@@ -118,6 +119,8 @@ parse_analyze(RawStmt *parseTree, const char *sourceText,
 	if (post_parse_analyze_hook)
 		(*post_parse_analyze_hook) (pstate, query);
 
+	pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
 	free_parsestate(pstate);
 
 	return query;
@@ -151,6 +154,8 @@ parse_analyze_varparams(RawStmt *parseTree, const char *sourceText,
 	if (post_parse_analyze_hook)
 		(*post_parse_analyze_hook) (pstate, query);
 
+	pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
 	free_parsestate(pstate);
 
 	return query;
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 498373fd0e..b080764165 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -284,6 +284,7 @@ InitProcGlobal(void)
 		 */
 		pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO);
 		pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO);
+		pg_atomic_init_u64(&(procs[i].queryId), 0);
 	}
 
 	/*
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 44a59e1d4f..43d4a852f9 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -744,6 +744,8 @@ pg_analyze_and_rewrite_params(RawStmt *parsetree,
 	if (post_parse_analyze_hook)
 		(*post_parse_analyze_hook) (pstate, query);
 
+	pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
 	free_parsestate(pstate);
 
 	if (log_parser_stats)
@@ -4029,6 +4031,12 @@ PostgresMain(int argc, char *argv[],
 		 */
 		debug_query_string = NULL;
 
+		/*
+		 * Also reset the queryId, as any new error encountered before a
+		 * specific query is executed isn't linked to the last saved value
+		 */
+		pg_atomic_write_u64(&MyProc->queryId, 0);
+
 		/*
 		 * Abort the current transaction in order to recover.
 		 */
@@ -4108,6 +4116,12 @@ PostgresMain(int argc, char *argv[],
 		 */
 		doing_extended_query_message = false;
 
+		/*
+		 * Also reset the queryId, so any error encountered before a specific
+		 * query is executed won't display the last saved value
+		 */
+		pg_atomic_write_u64(&MyProc->queryId, 0);
+
 		/*
 		 * Release storage left over from prior query cycle, and create a new
 		 * query input buffer in the cleared MessageContext.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 05240bfd14..f6b0c58b4c 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -546,7 +546,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	29
+#define PG_STAT_GET_ACTIVITY_COLS	30
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -875,6 +875,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				values[28] = BoolGetDatum(false);	/* GSS Encryption not in
 													 * use */
 			}
+			values[29] = DatumGetUInt64(pg_atomic_read_u64(&proc->queryId));
 		}
 		else
 		{
@@ -902,6 +903,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			nulls[26] = true;
 			nulls[27] = true;
 			nulls[28] = true;
+			nulls[29] = true;
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 8b4720ef3a..8e611bd239 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -2594,6 +2594,20 @@ log_line_prefix(StringInfo buf, ErrorData *edata)
 				else
 					appendStringInfoString(buf, unpack_sql_state(edata->sqlerrcode));
 				break;
+			case 'Q':
+				if (MyProc != NULL)
+				{
+					if (padding != 0)
+						appendStringInfo(buf, "%*ld", padding,
+								pg_atomic_read_u64(&MyProc->queryId));
+					else
+						appendStringInfo(buf, "%ld",
+								pg_atomic_read_u64(&MyProc->queryId));
+				}
+				else if (padding != 0)
+					appendStringInfoSpaces(buf,
+										   padding > 0 ? padding : -padding);
+				break;
 			default:
 				/* format error - ignore it */
 				break;
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5ee5e09ddf..d6d31195f3 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -523,6 +523,7 @@
 					#   %t = timestamp without milliseconds
 					#   %m = timestamp with milliseconds
 					#   %n = timestamp with milliseconds (as a Unix epoch)
+					#   %Q = query ID (0 if none or not computed)
 					#   %i = command tag
 					#   %e = SQL state
 					#   %c = session ID
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 87335248a0..115b9c4ad0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5114,9 +5114,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,queryid}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1cee7db89d..8e3a6ae9ca 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -173,6 +173,7 @@ struct PGPROC
 	 */
 	TransactionId procArrayGroupMemberXid;
 
+	pg_atomic_uint64	queryId;	/* current queryid if any */
 	uint32		wait_event_info;	/* proc's wait information */
 
 	/* Support for group transaction status update. */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 210e9cd146..0cbef52045 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1739,9 +1739,10 @@ pg_stat_activity| SELECT s.datid,
     s.state,
     s.backend_xid,
     s.backend_xmin,
+    s.queryid,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, queryid)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1845,7 +1846,7 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc);
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, queryid);
 pg_stat_progress_cluster| SELECT s.pid,
     s.datid,
     d.datname,
@@ -1952,7 +1953,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, queryid)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
@@ -1964,7 +1965,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc);
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, queryid);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
     st.pid,

Reply via email to