On Tue, Sep 15, 2015 at 11:00 AM, Shulgin, Oleksandr <
oleksandr.shul...@zalando.de> wrote:

> On Mon, Sep 14, 2015 at 7:27 PM, Pavel Stehule <pavel.steh...@gmail.com>
> wrote:
>
>>
>> 2015-09-14 18:46 GMT+02:00 Shulgin, Oleksandr <
>> oleksandr.shul...@zalando.de>:
>>
>>>
>>> ... This way the receiver only writes to the slot and the sender only
>>> reads from it.
>>>
>>> By the way, is it safe to assume atomic read/writes of dsm_handle
>>> (uint32)?  I would be surprised if not.
>>>
>>
>> I don't see any reason why it should not to work - only few processes
>> will wait for data - so lost attach/detach shm operations will not be too
>> much.
>>
>
> Please see attached for implementation of this approach.  The most
> surprising thing is that it actually works :)
>
> One problem still remains with the process requesting information when the
> target process exits before it can have a chance to handle the signal.  The
> requesting process then waits forever, because nobody attaches/detaches the
> queue.  We've discussed this before and looks like we need to introduce a
> timeout parameter to pg_cmdstatus()...
>

I've added the timeout parameter to the pg_cmdstatus call, and more
importantly to the sending side of the queue, so that one can limit the
potential effect of handling the interrupt in case something goes really
wrong.

I've tested a number of possible scenarios with artificial delays in reply
and sending cancel request or SIGTERM to the receiving side and this is all
seems to work nicely due to proper message queue detach event
notification.  Still I think it helps to know that there is a timeout in
case the receiving side is really slow to read the message.

I've also decided we really ought to suppress any possible ERROR level
messages generated during the course of processing the status request in
order not to prevent the originally running transaction to complete.  The
errors so caught are just logged using LOG level and ignored in this new
version of the patch.

I'm also submitting the instrumentation support as a separate patch on top
of this.  I'm not really fond of adding bool parameter to InstrEndLoop, but
for now I didn't find any better way.

What I'm now thinking about is probably we can extend this backend
communication mechanism in order to query GUC values effective in a
different backend or even setting the values.  The obvious candidate to
check when you see some query misbehaving would be work_mem, for example.
Or we could provide a report of all settings that were overridden in the
backend's session, to the effect of running something like this:

select * from pg_settings where context = 'user' and setting != reset_val;

The obvious candidates to be set externally are the
cmdstatus_analyze/instrument_*: when you decided you want to turn them on,
you'd rather do that carefully for a single backend than globally
per-cluster.  One can still modify the postgresql.conf and then send SIGHUP
to just a single backend, but I think a more direct way to alter the
settings would be better.

In this light should we rename the API to something like "backend control"
instead of "command status"?  The SHOW/SET syntax could be extended to
support the remote setting retrieval/update.

--
Alex
diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 2a184ed..d2f46bb 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -288,7 +288,7 @@ explain_ExecutorEnd(QueryDesc *queryDesc)
 		 * Make sure stats accumulation is done.  (Note: it's okay if several
 		 * levels of hook all do this.)
 		 */
-		InstrEndLoop(queryDesc->totaltime);
+		InstrEndLoop(queryDesc->totaltime, false);
 
 		/* Log plan if duration is exceeded. */
 		msec = queryDesc->totaltime->total * 1000.0;
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 59b8a2e..d90fc6d 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -926,7 +926,7 @@ pgss_ExecutorEnd(QueryDesc *queryDesc)
 		 * Make sure stats accumulation is done.  (Note: it's okay if several
 		 * levels of hook all do this.)
 		 */
-		InstrEndLoop(queryDesc->totaltime);
+		InstrEndLoop(queryDesc->totaltime, false);
 
 		pgss_store(queryDesc->sourceText,
 				   queryId,
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 5d06fa4..e5a28a9 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -654,7 +654,7 @@ report_triggers(ResultRelInfo *rInfo, bool show_relname, ExplainState *es)
 		char	   *conname = NULL;
 
 		/* Must clean up instrumentation state */
-		InstrEndLoop(instr);
+		InstrEndLoop(instr, false);
 
 		/*
 		 * We ignore triggers that were never invoked; they likely aren't
@@ -1233,62 +1233,75 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		}
 	}
 
-	/*
-	 * We have to forcibly clean up the instrumentation state because we
-	 * haven't done ExecutorEnd yet.  This is pretty grotty ...
-	 *
-	 * Note: contrib/auto_explain could cause instrumentation to be set up
-	 * even though we didn't ask for it here.  Be careful not to print any
-	 * instrumentation results the user didn't ask for.  But we do the
-	 * InstrEndLoop call anyway, if possible, to reduce the number of cases
-	 * auto_explain has to contend with.
-	 */
 	if (planstate->instrument)
-		InstrEndLoop(planstate->instrument);
-
-	if (es->analyze &&
-		planstate->instrument && planstate->instrument->nloops > 0)
 	{
-		double		nloops = planstate->instrument->nloops;
-		double		startup_sec = 1000.0 * planstate->instrument->startup / nloops;
-		double		total_sec = 1000.0 * planstate->instrument->total / nloops;
-		double		rows = planstate->instrument->ntuples / nloops;
+		/*
+		 * We have to forcibly clean up the instrumentation state because we
+		 * haven't done ExecutorEnd yet.  This is pretty grotty ...
+		 *
+		 * Note: contrib/auto_explain could cause instrumentation to be set up
+		 * even though we didn't ask for it here.  Be careful not to print any
+		 * instrumentation results the user didn't ask for.  But we do the
+		 * InstrEndLoop call anyway, if possible, to reduce the number of cases
+		 * auto_explain has to contend with.
+		 */
+		InstrEndLoop(planstate->instrument, es->running);
 
-		if (es->format == EXPLAIN_FORMAT_TEXT)
-		{
-			if (es->timing)
-				appendStringInfo(es->str,
-							" (actual time=%.3f..%.3f rows=%.0f loops=%.0f)",
-								 startup_sec, total_sec, rows, nloops);
-			else
-				appendStringInfo(es->str,
-								 " (actual rows=%.0f loops=%.0f)",
-								 rows, nloops);
-		}
-		else
+		if (es->analyze)
 		{
-			if (es->timing)
+			double		nloops = planstate->instrument->nloops;
+
+			if (es->running && planstate->instrument->running)
+				nloops += 1; /* so we can display something */
+
+			if (nloops > 0)
 			{
-				ExplainPropertyFloat("Actual Startup Time", startup_sec, 3, es);
-				ExplainPropertyFloat("Actual Total Time", total_sec, 3, es);
+				double		startup_sec = 1000.0 * planstate->instrument->startup / nloops;
+				double		total_sec = 1000.0 * planstate->instrument->total / nloops;
+				double		rows = planstate->instrument->ntuples / nloops;
+				double		percent_done = 100.0 * rows / plan->plan_rows;
+
+				if (es->format == EXPLAIN_FORMAT_TEXT)
+				{
+					if (es->timing)
+						appendStringInfo(es->str,
+										 " (actual time=%.3f..%.3f rows=%.0f loops=%.0f)",
+										 startup_sec, total_sec, rows,
+										 planstate->instrument->nloops);
+					else
+						appendStringInfo(es->str,
+										 " (actual rows=%.0f loops=%.0f)",
+										 rows, planstate->instrument->nloops);
+
+					if (es->running)
+						appendStringInfo(es->str, " %.1f%%", percent_done);
+				}
+				else
+				{
+					if (es->timing)
+					{
+						ExplainPropertyFloat("Actual Startup Time", startup_sec, 3, es);
+						ExplainPropertyFloat("Actual Total Time", total_sec, 3, es);
+					}
+					ExplainPropertyFloat("Actual Rows", rows, 0, es);
+					ExplainPropertyFloat("Actual Loops", planstate->instrument->nloops, 0, es);
+				}
 			}
-			ExplainPropertyFloat("Actual Rows", rows, 0, es);
-			ExplainPropertyFloat("Actual Loops", nloops, 0, es);
-		}
-	}
-	else if (es->analyze)
-	{
-		if (es->format == EXPLAIN_FORMAT_TEXT)
-			appendStringInfoString(es->str, " (never executed)");
-		else
-		{
-			if (es->timing)
+			else
 			{
-				ExplainPropertyFloat("Actual Startup Time", 0.0, 3, es);
-				ExplainPropertyFloat("Actual Total Time", 0.0, 3, es);
+				if (es->format == EXPLAIN_FORMAT_TEXT)
+					appendStringInfoString(es->str, " (never executed)");
+				else
+				{
+					if (es->timing)
+					{
+						ExplainPropertyFloat("Actual Startup Time", 0.0, 3, es);
+						ExplainPropertyFloat("Actual Total Time", 0.0, 3, es);
+					}
+					ExplainPropertyFloat("Actual Rows", 0.0, 0, es);
+					ExplainPropertyFloat("Actual Loops", 0.0, 0, es);
+				}
 			}
-			ExplainPropertyFloat("Actual Rows", 0.0, 0, es);
-			ExplainPropertyFloat("Actual Loops", 0.0, 0, es);
 		}
 	}
 
@@ -2650,7 +2663,7 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors,
 			double		insert_path;
 			double		other_path;
 
-			InstrEndLoop(mtstate->mt_plans[0]->instrument);
+			InstrEndLoop(mtstate->mt_plans[0]->instrument, es->running);
 
 			/* count the number of source rows */
 			total = mtstate->mt_plans[0]->instrument->ntuples;
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 93e1e9a..33162f2 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -72,7 +72,7 @@ ExecReScan(PlanState *node)
 {
 	/* If collecting timing stats, update them */
 	if (node->instrument)
-		InstrEndLoop(node->instrument);
+		InstrEndLoop(node->instrument, false);
 
 	/*
 	 * If we have changed parameters, propagate that info.
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index f5351eb..132153e 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -100,7 +100,7 @@ InstrStopNode(Instrumentation *instr, double nTuples)
 
 /* Finish a run cycle for a plan node */
 void
-InstrEndLoop(Instrumentation *instr)
+InstrEndLoop(Instrumentation *instr, bool partial_run)
 {
 	double		totaltime;
 
@@ -108,7 +108,7 @@ InstrEndLoop(Instrumentation *instr)
 	if (!instr->running)
 		return;
 
-	if (!INSTR_TIME_IS_ZERO(instr->starttime))
+	if (!INSTR_TIME_IS_ZERO(instr->starttime) && !partial_run)
 		elog(ERROR, "InstrEndLoop called on running node");
 
 	/* Accumulate per-cycle statistics into totals */
@@ -117,11 +117,15 @@ InstrEndLoop(Instrumentation *instr)
 	instr->startup += instr->firsttuple;
 	instr->total += totaltime;
 	instr->ntuples += instr->tuplecount;
-	instr->nloops += 1;
 
 	/* Reset for next cycle (if any) */
-	instr->running = false;
-	INSTR_TIME_SET_ZERO(instr->starttime);
+	if (!partial_run)
+	{
+		instr->nloops += 1;
+
+		instr->running = false;
+		INSTR_TIME_SET_ZERO(instr->starttime);
+	}
 	INSTR_TIME_SET_ZERO(instr->counter);
 	instr->firsttuple = 0;
 	instr->tuplecount = 0;
diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c
index 5078100..b969907 100644
--- a/src/backend/utils/adt/cmdstatus.c
+++ b/src/backend/utils/adt/cmdstatus.c
@@ -95,7 +95,12 @@ typedef struct CmdInfoStack {
 } CmdInfoStack;
 
 
+/* GUCs */
 int cmdstatus_interrupt_timeout = CMD_STATUS_DEFAULT_INTERRUPT_TIMEOUT;
+bool cmdstatus_analyze = false;
+bool cmdstatus_instrument_timer = false;
+bool cmdstatus_instrument_rows = false;
+bool cmdstatus_instrument_buffers = false;
 
 /* The array of slots pre-allocated on shared memory. */
 static volatile dsm_handle *CmdStatusSlots = NULL;
@@ -106,8 +111,10 @@ static volatile dsm_handle *MyCmdStatusSlot = NULL;
 static CmdInfoStack	   *current_query_stack = NULL;
 static int				query_stack_size = 0;  /* XXX not really used */
 
+static ExecutorStart_hook_type	prev_ExecutorStart = NULL;
 static ExecutorRun_hook_type	prev_ExecutorRun = NULL;
 
+static void cmdstatus_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void cmdstatus_ExecutorRun(QueryDesc *queryDesc,
 								  ScanDirection direction, long count);
 
@@ -162,12 +169,42 @@ CmdStatusInit(int css_idx)
 	on_shmem_exit(CleanupCmdStatusSlot, (Datum) 0);
 
 	/* also install executor hooks */
+	prev_ExecutorStart = ExecutorStart_hook;
+	ExecutorStart_hook = cmdstatus_ExecutorStart;
+
 	prev_ExecutorRun = ExecutorRun_hook;
 	ExecutorRun_hook = cmdstatus_ExecutorRun;
 }
 
+
+/*
+ * The ExecutorStart hook.
+ *
+ * Turns on query instrumentation options based on configuration setting.
+ */
+static void
+cmdstatus_ExecutorStart(QueryDesc *queryDesc, int eflags)
+{
+	if (cmdstatus_analyze)
+	{
+		if (cmdstatus_instrument_timer)
+			queryDesc->instrument_options |= INSTRUMENT_TIMER;
+
+		if (cmdstatus_instrument_rows)
+			queryDesc->instrument_options |= INSTRUMENT_ROWS;
+
+		if (cmdstatus_instrument_buffers)
+			queryDesc->instrument_options |= INSTRUMENT_BUFFERS;
+	}
+
+	if (prev_ExecutorStart)
+		prev_ExecutorStart(queryDesc, eflags);
+	else
+		standard_ExecutorStart(queryDesc, eflags);
+}
+
 /*
- * The executor hook.
+ * The ExecutorRun hook.
  *
  * Accumulates the query descriptors on the program stack and takes care of
  * popping the current frame when leaving the function ab-/normally.
@@ -217,9 +254,11 @@ explain_query(QueryDesc *queryDesc)
 	ExplainState   *es;
 
 	es = NewExplainState();
-	es->analyze = false;
+	es->running = true;
+	es->analyze = cmdstatus_analyze;
+	es->timing = cmdstatus_instrument_timer;
 	es->verbose = false;
-	es->buffers = false;
+	es->buffers = cmdstatus_instrument_buffers;
 	es->format = EXPLAIN_FORMAT_TEXT;
 
 	ExplainBeginOutput(es);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 7cac9d3..21054de 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1619,6 +1619,46 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"cmdstatus_analyze", PGC_USERSET, STATS,
+			gettext_noop("Automatically instrument all queries for extraction of EXPLAIN ANALYZE plans with pg_cmdstatus() function."),
+			NULL,
+		},
+		&cmdstatus_analyze,
+		false,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"cmdstatus_instrument_timer", PGC_USERSET, STATS,
+			gettext_noop("Automatically instrument all queries for timing, requires cmdstatus_analyze=on."),
+			NULL,
+		},
+		&cmdstatus_instrument_timer,
+		false,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"cmdstatus_instrument_rows", PGC_USERSET, STATS,
+			gettext_noop("Automatically instrument all queries for tracking row counts, requires cmdstatus_analyze=on."),
+			NULL,
+		},
+		&cmdstatus_instrument_rows,
+		false,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"cmdstatus_instrument_buffers", PGC_USERSET, STATS,
+			gettext_noop("Automatically instrument all queries for tracking buffers usage, requires cmdstatus_analyze=on."),
+			NULL,
+		},
+		&cmdstatus_instrument_buffers,
+		false,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c33e585..a275f51 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -471,6 +471,13 @@
 #log_statement_stats = off
 
 
+# - Automatic Query Instrumentation -
+
+#cmdstatus_analyze = off
+#cmdstatus_instrument_timer = off
+#cmdstatus_instrument_rows = off
+#cmdstatus_instrument_buffers = off
+
 #------------------------------------------------------------------------------
 # AUTOVACUUM PARAMETERS
 #------------------------------------------------------------------------------
diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h
index 26fcc5b..3334f41 100644
--- a/src/include/commands/explain.h
+++ b/src/include/commands/explain.h
@@ -35,6 +35,7 @@ typedef struct ExplainState
 	bool		timing;			/* print detailed node timing */
 	bool		summary;		/* print total planning and execution timing */
 	ExplainFormat format;		/* output format */
+	bool		running;		/* explain on a running statement */
 	/* other states */
 	PlannedStmt *pstmt;			/* top of plan */
 	List	   *rtable;			/* range table */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index c9a2129..a18b089 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -68,6 +68,6 @@ extern PGDLLIMPORT BufferUsage pgBufferUsage;
 extern Instrumentation *InstrAlloc(int n, int instrument_options);
 extern void InstrStartNode(Instrumentation *instr);
 extern void InstrStopNode(Instrumentation *instr, double nTuples);
-extern void InstrEndLoop(Instrumentation *instr);
+extern void InstrEndLoop(Instrumentation *instr, bool partial_run);
 
 #endif   /* INSTRUMENT_H */
diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h
index d6379f0..a9a508a 100644
--- a/src/include/utils/cmdstatus.h
+++ b/src/include/utils/cmdstatus.h
@@ -16,6 +16,10 @@
 #define CMD_STATUS_DEFAULT_INTERRUPT_TIMEOUT		10	/* milliseconds */
 
 extern int cmdstatus_interrupt_timeout;
+extern bool cmdstatus_analyze;
+extern bool cmdstatus_instrument_timer;
+extern bool cmdstatus_instrument_rows;
+extern bool cmdstatus_instrument_buffers;
 
 extern Size CmdStatusShmemSize(void);
 extern void CmdStatusShmemInit(void);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 32ac58f..2e3beaf 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -43,6 +43,7 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
+#include "utils/cmdstatus.h"
 
 
 shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -139,6 +140,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, CmdStatusShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -243,6 +245,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	ReplicationOriginShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
+	CmdStatusShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 0abde43..e637be1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -26,6 +26,7 @@
 #include "storage/shmem.h"
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
+#include "utils/cmdstatus.h"
 
 
 /*
@@ -296,6 +297,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+	if (CheckProcSignal(PROCSIG_CMD_STATUS_INFO))
+		HandleCmdStatusInfoInterrupt();
+
 	if (set_latch_on_sigusr1)
 		SetLatch(MyLatch);
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index d917af3..1a5b03c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -67,6 +67,7 @@
 #include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
 #include "tcop/utility.h"
+#include "utils/cmdstatus.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -2991,6 +2992,9 @@ ProcessInterrupts(void)
 
 	if (ParallelMessagePending)
 		HandleParallelMessages();
+
+	if (CmdStatusInfoPending)
+		ProcessCmdStatusInfoRequest();
 }
 
 
diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile
index 3ed0b44..2c8687c 100644
--- a/src/backend/utils/adt/Makefile
+++ b/src/backend/utils/adt/Makefile
@@ -18,7 +18,7 @@ endif
 # keep this list arranged alphabetically or it gets to be a mess
 OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \
 	array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \
-	bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \
+	bool.o cash.o char.o cmdstatus.o date.o datetime.o datum.o dbsize.o domains.o \
 	encode.o enum.o expandeddatum.o \
 	float.o format_type.o formatting.o genfile.o \
 	geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \
diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c
new file mode 100644
index 0000000..5078100
--- /dev/null
+++ b/src/backend/utils/adt/cmdstatus.c
@@ -0,0 +1,791 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.c
+ *	  Definitions for pg_cmdstatus function.
+ *
+ * Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/utils/adt/cmdstatus.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/htup_details.h"
+#include "commands/explain.h"
+#include "lib/stringinfo.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/dest.h"
+#include "tcop/pquery.h"
+#include "utils/builtins.h"
+#include "utils/cmdstatus.h"
+
+
+typedef enum {
+	CMD_STATUS_REQUEST_EXPLAIN = 1,
+	CMD_STATUS_REQUEST_QUERY_TEXT = 2,
+	CMD_STATUS_REQUEST_PROGRESS_TAG = 3,
+	CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE = 4
+} CmdStatusInfoRequestType;
+
+#define CMD_STATUS_MAX_REQUEST CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE
+
+typedef enum {
+	CMD_STATUS_RESULT_FAILURE = -1,
+	CMD_STATUS_RESULT_SUCCESS = 0,
+	CMD_STATUS_RESULT_BACKEND_IDLE,
+	CMD_STATUS_RESULT_NO_DATA
+} CmdStatusInfoResultCode;
+
+/*
+ * We allocate an array the size of NumCmdStatusSlots of DSM handles in the
+ * shared memory.  This slots array is indexed by backend ID, so each slot can
+ * corresponds to at max. one backend at any given time.
+ *
+ * In order to actually query some backend for its status, the interested
+ * process will create a DSM segment, initialize a table of contents in it,
+ * fill the target process id and request type and create a shared memory
+ * queue in the segment.
+ *
+ * The interested process will then put the such initialized DSM segment's
+ * handle in its own slot and send the SIGUSR1 with the reason
+ * PROCSIG_CMD_STATUS_INFO to the process being queried.  The interested
+ * process will wait for the response to be delivered via the shared memory
+ * queue and the result code via a field in the DSM segment.
+ *
+ * The layout of a DSM segment is given by the following TOC keys enumeration:
+ */
+typedef enum {
+	CmdStatusSlotKeyTargetProcessID,
+	CmdStatusSlotKeyIsProcessedFlag,
+	CmdStatusSlotKeyRequestType,
+	CmdStatusSlotKeyResultCode,
+	CmdStatusSlotKeyMemoryQueue,
+
+	CmdStatusTotalSlotKeys
+} CmdStatusSlotKey;
+
+#define NumCmdStatusSlots	MaxBackends
+
+#define PG_CMD_STATUS_INFO_MAGIC	0x79fb2449
+#define CMD_STATUS_BUFFER_SIZE		1024
+
+/*
+ * These structs are allocated on the program stack as local variables in the
+ * ExecutorRun hook.  The top of stack is current_query_stack, see below.
+ *
+ * Capturing the execution stack allows us to inspect the inner-most running
+ * query as well as showing the complete backtrace.
+ */
+typedef struct CmdInfoStack {
+	QueryDesc			   *query_desc;
+	struct CmdInfoStack	   *parent;
+} CmdInfoStack;
+
+
+int cmdstatus_interrupt_timeout = CMD_STATUS_DEFAULT_INTERRUPT_TIMEOUT;
+
+/* The array of slots pre-allocated on shared memory. */
+static volatile dsm_handle *CmdStatusSlots = NULL;
+
+/* Pointer into the slots array, to a slot assigned for this backend. */
+static volatile dsm_handle *MyCmdStatusSlot = NULL;
+
+static CmdInfoStack	   *current_query_stack = NULL;
+static int				query_stack_size = 0;  /* XXX not really used */
+
+static ExecutorRun_hook_type	prev_ExecutorRun = NULL;
+
+static void cmdstatus_ExecutorRun(QueryDesc *queryDesc,
+								  ScanDirection direction, long count);
+
+static void CleanupCmdStatusSlot(int status, Datum arg);
+static StringInfo ProcessCmdStatusSlot(CmdStatusInfoRequestType request_type,
+					 CmdStatusInfoResultCode *result_code);
+
+static StringInfo explain_query(QueryDesc *queryDesc);
+static void report_result_code(CmdStatusInfoResultCode result, pid_t target_pid);
+static void RestoreResourceOwner(ResourceOwner prevResourceOwner);
+
+
+Size
+CmdStatusShmemSize(void)
+{
+	return NumCmdStatusSlots * sizeof(dsm_handle);
+}
+
+void
+CmdStatusShmemInit(void)
+{
+	Size		size = CmdStatusShmemSize();
+	bool		found;
+
+	CmdStatusSlots = (dsm_handle *) ShmemInitStruct("CmdStatusSlots", size, &found);
+
+	if (!found)
+		MemSet(CmdStatusSlots, 0, size);
+}
+
+static void
+CleanupCmdStatusSlot(int status, Datum arg)
+{
+	Assert(MyCmdStatusSlot != NULL);
+
+	*MyCmdStatusSlot = 0;	/* just in case */
+
+	MyCmdStatusSlot = NULL;
+}
+
+/*
+ * CmdStatusInit
+ *		Initialize MyCmdStatusSlot
+ *
+ * The passed index should be MyBackendId.
+ */
+void
+CmdStatusInit(int css_idx)
+{
+	MyCmdStatusSlot = &CmdStatusSlots[css_idx - 1];
+
+	on_shmem_exit(CleanupCmdStatusSlot, (Datum) 0);
+
+	/* also install executor hooks */
+	prev_ExecutorRun = ExecutorRun_hook;
+	ExecutorRun_hook = cmdstatus_ExecutorRun;
+}
+
+/*
+ * The executor hook.
+ *
+ * Accumulates the query descriptors on the program stack and takes care of
+ * popping the current frame when leaving the function ab-/normally.
+ */
+static void
+cmdstatus_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+{
+	CmdInfoStack	current;
+
+	current.query_desc = queryDesc;
+	current.parent = current_query_stack;
+
+	current_query_stack = &current;
+	query_stack_size++;
+
+	PG_TRY();
+	{
+		if (prev_ExecutorRun)
+			prev_ExecutorRun(queryDesc, direction, count);
+		else
+			standard_ExecutorRun(queryDesc, direction, count);
+
+		Assert(current_query_stack == &current);
+		Assert(query_stack_size > 0);
+
+		query_stack_size--;
+		current_query_stack = current.parent;
+	}
+	PG_CATCH();
+	{
+		Assert(current_query_stack == &current);
+		Assert(query_stack_size > 0);
+
+		query_stack_size--;
+		current_query_stack = current.parent;
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
+/* Produce an explain plan of a single query descriptor. */
+static StringInfo
+explain_query(QueryDesc *queryDesc)
+{
+	StringInfo		str;
+	ExplainState   *es;
+
+	es = NewExplainState();
+	es->analyze = false;
+	es->verbose = false;
+	es->buffers = false;
+	es->format = EXPLAIN_FORMAT_TEXT;
+
+	ExplainBeginOutput(es);
+	/* XXX: appendStringInfo(es->str, "#%d ", depth); ? */
+	ExplainQueryText(es, queryDesc);
+	ExplainPrintPlan(es, queryDesc);
+	ExplainEndOutput(es);
+
+	str = es->str;
+
+	pfree(es);
+	return str;
+}
+
+
+/* signal handler for PROCSIG_CMD_STATUS_INFO */
+void
+HandleCmdStatusInfoInterrupt(void)
+{
+	CmdStatusInfoPending = true;
+	InterruptPending = true;
+
+	SetLatch(MyLatch);
+}
+
+/* Produce a response and set the result code for a single slot. */
+static StringInfo
+ProcessCmdStatusSlot(CmdStatusInfoRequestType request_type,
+					 CmdStatusInfoResultCode *result_code)
+{
+	StringInfo		result = NULL;
+
+	/* Show some optimism, overwrite with error code later if needed. */
+	*result_code = CMD_STATUS_RESULT_SUCCESS;
+
+	if (ActivePortal)
+	{
+		switch (request_type)
+		{
+			case CMD_STATUS_REQUEST_EXPLAIN:
+				if (ActivePortal->queryDesc != NULL)
+					result = explain_query(ActivePortal->queryDesc);
+				else if (current_query_stack != NULL)
+					result = explain_query(current_query_stack->query_desc);
+				else
+					*result_code = CMD_STATUS_RESULT_NO_DATA;
+				break;
+
+			case CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE:
+				{
+					StringInfo		str;
+					CmdInfoStack   *query;
+
+					result = makeStringInfo();
+
+					if (current_query_stack != NULL)
+					{
+						for (query = current_query_stack;
+							 query != NULL;
+							 query = query->parent)
+						{
+							str = explain_query(query->query_desc);
+							appendBinaryStringInfo(result, str->data, str->len);
+
+							pfree(str->data);
+							pfree(str);
+						}
+					}
+					else
+						*result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+					break;
+				}
+
+			case CMD_STATUS_REQUEST_QUERY_TEXT:
+				result = makeStringInfo();
+				appendStringInfoString(result, ActivePortal->sourceText);
+				break;
+
+			case CMD_STATUS_REQUEST_PROGRESS_TAG:
+				if (ActivePortal->commandTag != NULL)
+				{
+					result = makeStringInfo();
+
+					if (ActivePortal->queryDesc != NULL &&
+						ActivePortal->queryDesc->estate != NULL)
+					{
+						appendStringInfo(result, "%s %u",
+										 ActivePortal->commandTag,
+										 ActivePortal->queryDesc->estate->es_processed);
+					}
+					else
+					{
+						/* no progress available, at least show the command tag */
+						appendStringInfoString(result, ActivePortal->commandTag);
+					}
+				}
+				else
+					*result_code = CMD_STATUS_RESULT_NO_DATA;
+				break;
+		}
+	}
+	else
+		*result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+
+	return result;
+}
+
+static void
+RestoreResourceOwner(ResourceOwner prevResourceOwner)
+{
+	if (prevResourceOwner != CurrentResourceOwner)
+	{
+		ResourceOwner	res_owner = CurrentResourceOwner;
+
+		CurrentResourceOwner = prevResourceOwner;
+		ResourceOwnerDelete(res_owner);
+	}
+}
+
+/*
+ * Process all command status requests from other backends by scanning the
+ * whole array of slots looking for the requests targeted at us, which were
+ * not already processed by us during earlier rounds.
+ *
+ * Most of the times it is expected that the first valid slot we find is for
+ * us, so there's no penalty for actually attaching it and looking inside.
+ */
+void
+ProcessCmdStatusInfoRequest(void)
+{
+	MemoryContext	context = CurrentMemoryContext;
+	ResourceOwner	prevResourceOwner = CurrentResourceOwner;
+	struct timeval	tmleft;
+
+	tmleft.tv_sec = 0;
+	tmleft.tv_usec = cmdstatus_interrupt_timeout * 1000L;
+
+	/* Ensure valid resource owner for access to dsm. */
+	if (CurrentResourceOwner == NULL)
+		CurrentResourceOwner = ResourceOwnerCreate(NULL, "ProcessCmdStatusInfoRequest");
+
+	/*
+	 * We might have been signaled again by another process while handling a
+	 * request, so re-check after going through all the slots.
+	 */
+	while (CmdStatusInfoPending)
+	{
+		int		i;
+
+		CmdStatusInfoPending = false;
+
+		for (i = 0; i < NumCmdStatusSlots; i++)
+		{
+			dsm_handle slot = CmdStatusSlots[i];
+
+			if (slot != 0)
+			{
+				dsm_segment	   *seg = NULL;
+				shm_toc		   *toc = NULL;
+				pid_t		   *target_pid_ptr;
+				bool		   *is_processed_flag_ptr;
+				CmdStatusInfoRequestType   *request_type_ptr;
+				CmdStatusInfoResultCode	   *result_code_ptr;
+				shm_mq		   *mq;
+				shm_mq_handle  *output;
+				struct timeval		tmnow;
+				struct timeval		tmthen;
+				struct timeval		tmtemp;
+
+				seg = dsm_attach(slot);
+				if (seg == NULL)
+				{
+					elog(LOG, "unable to map dynamic memory segment for command status");
+					continue;
+				}
+
+				toc = shm_toc_attach(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg));
+				if (toc == NULL)
+				{
+					elog(LOG, "bad magic in dynamic memory segment for command status");
+
+					dsm_detach(seg);
+					continue;
+				}
+
+				target_pid_ptr = (pid_t *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyTargetProcessID);
+				is_processed_flag_ptr = (bool *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyIsProcessedFlag);
+
+				if (*target_pid_ptr != MyProcPid || *is_processed_flag_ptr)
+				{
+					dsm_detach(seg);
+					continue;
+				}
+
+				/* Set the flag early to avoid re-processing during next round. */
+				*is_processed_flag_ptr = true;
+
+				request_type_ptr = (CmdStatusInfoRequestType *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyRequestType);
+
+				result_code_ptr = (CmdStatusInfoResultCode *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyResultCode);
+
+				mq = shm_toc_lookup(toc, CmdStatusSlotKeyMemoryQueue);
+				output = shm_mq_attach(mq, seg, NULL);
+
+				PG_TRY();
+				{
+					StringInfo		payload;
+					int				res;
+
+					payload = ProcessCmdStatusSlot(*request_type_ptr, result_code_ptr);
+					if (payload != NULL)
+					{
+						for (;;)
+						{
+							/* Send the data, including null terminator. */
+							res = shm_mq_send(output, payload->len + 1, payload->data, true);
+							if (res == SHM_MQ_SUCCESS)
+							{
+								break;
+							}
+							else if (res == SHM_MQ_DETACHED)
+							{
+								elog(LOG, "receiving backend has detached from the shared memory queue prematurely");
+								break;
+							}
+							else if (res == SHM_MQ_WOULD_BLOCK)
+							{
+								int		waitres;
+
+								gettimeofday(&tmthen, NULL);
+
+								waitres = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT,
+													tmleft.tv_usec / 1000L);
+								if (waitres & WL_TIMEOUT)
+								{
+									elog(LOG, "timeout while sending command status response");
+									break;
+								}
+
+								if (waitres & WL_LATCH_SET)
+									ResetLatch(MyLatch);
+
+								gettimeofday(&tmnow, NULL);
+
+								/* left = left + then - now */
+								timeradd(&tmleft, &tmthen, &tmtemp);
+								timersub(&tmtemp, &tmnow, &tmleft);
+
+								if (tmleft.tv_sec < 0)
+								{
+									tmleft.tv_sec = 0;
+									tmleft.tv_usec = 0;
+								}
+							}
+						}
+
+						pfree(payload->data);
+						pfree(payload);
+					}
+					/*
+					 * If there was no payload to be sent, we just detach from
+					 * the queue and the receiver should wake up.
+					 */
+
+					shm_mq_detach(mq);
+					dsm_detach(seg);
+				}
+				PG_CATCH();
+				{
+					/*
+					 * Detaching from the queue also wakes up the receiver, be
+					 * sure to update the slot status before detaching.
+					 */
+					shm_mq_detach(mq);
+					dsm_detach(seg);
+
+					MemoryContextSwitchTo(context);
+					PG_TRY();
+					{
+						ErrorData	   *edata = CopyErrorData();
+
+						/* XXX is there a way to just report the current ERROR with LOG instead, without making a copy first? */
+						elog(LOG, "an error occurred while processing command status request: %s",
+							 edata->message);
+
+						FreeErrorData(edata);
+						FlushErrorState();
+					}
+					PG_CATCH();
+					{
+						FlushErrorState();
+						MemoryContextSwitchTo(context);
+
+						elog(LOG, "out of memory while processing command status request");
+					}
+					PG_END_TRY();
+					continue;
+				}
+				PG_END_TRY();
+			}
+		}
+	}
+
+	RestoreResourceOwner(prevResourceOwner);
+}
+
+static void
+report_result_code(CmdStatusInfoResultCode result, pid_t target_pid)
+{
+	switch (result)
+	{
+		case CMD_STATUS_RESULT_SUCCESS:
+			/* nothing to report */
+			break;
+
+		case CMD_STATUS_RESULT_BACKEND_IDLE:
+			elog(INFO, "no command is currently running in backend with PID %d",
+				 target_pid);
+			break;
+
+		case CMD_STATUS_RESULT_NO_DATA:
+			elog(WARNING, "no suitable data found for the query in backend with PID %d",
+				 target_pid);
+			break;
+
+		default:
+			elog(ERROR, "general command status request failure when querying backend with PID %d",
+				 target_pid);
+			break;
+	}
+}
+
+/*
+ * Try to get status of a command running in another process.
+ *
+ * FUNCTION pg_cmdstatus(pid, request_type)
+ * RETURNS SETOF text
+ */
+Datum
+pg_cmdstatus(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo	   *rsinfo	= (ReturnSetInfo *) fcinfo->resultinfo;
+	Tuplestorestate	   *tupstore;
+	TupleDesc			tupdesc;
+	MemoryContext		per_query_ctx;
+	MemoryContext		oldcontext;
+	pid_t				target_pid = (pid_t) PG_GETARG_INT32(0);
+	int					request_type = PG_GETARG_INT32(1);
+	int					timeout = PG_GETARG_INT32(2);
+	PGPROC			   *target_proc;
+	shm_toc_estimator	estimator;
+	Size				segsize;
+	dsm_segment		   *seg;
+	shm_toc			   *toc;
+	pid_t			   *target_pid_ptr;
+	bool			   *is_processed_flag_ptr;
+	CmdStatusInfoRequestType   *request_type_ptr;
+	CmdStatusInfoResultCode	   *result_code_ptr;
+	shm_mq			   *mq;
+	shm_mq_handle	   *input;
+	struct timeval		tmnow;
+	struct timeval		tmthen;
+	struct timeval		tmtemp;
+	struct timeval		tmleft;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	if (request_type < 1 || request_type > CMD_STATUS_MAX_REQUEST)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("unknown command status request")));
+
+	if (target_pid == MyProcPid)
+		ereport(ERROR,
+				(errmsg("backend cannot query command status of itself")));
+
+	/* verify access to target_pid */
+	target_proc = BackendPidGetProc(target_pid);
+
+	if (target_proc == NULL)
+		ereport(ERROR,
+				(errmsg("PID %d is not a PostgreSQL server process", target_pid)));
+
+	if (!(superuser() || target_proc->roleId == GetUserId()))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 (errmsg("must be superuser or have the same role to explain queries running in other server processes"))));
+
+	if (*MyCmdStatusSlot != 0)
+		ereport(ERROR,
+				(errmsg("pg_cmdstatus() call is already in progress")));
+
+	/* need to build tuplestore in query context */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+	tupstore = tuplestore_begin_heap(false, false, work_mem);
+	MemoryContextSwitchTo(oldcontext);
+
+	/* prepare shared dsm segment */
+	shm_toc_initialize_estimator(&estimator);
+	shm_toc_estimate_keys(&estimator, CmdStatusTotalSlotKeys);
+	shm_toc_estimate_chunk(&estimator, sizeof(pid_t));
+	shm_toc_estimate_chunk(&estimator, sizeof(bool));
+	shm_toc_estimate_chunk(&estimator, sizeof(CmdStatusInfoRequestType));
+	shm_toc_estimate_chunk(&estimator, sizeof(CmdStatusInfoResultCode));
+	shm_toc_estimate_chunk(&estimator, CMD_STATUS_BUFFER_SIZE);
+	segsize = shm_toc_estimate(&estimator);
+
+	seg = dsm_create(segsize, 0);
+
+	toc = shm_toc_create(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg), segsize);
+
+	target_pid_ptr = (pid_t *) shm_toc_allocate(toc, sizeof(pid_t));
+	*target_pid_ptr = target_pid;
+	shm_toc_insert(toc, CmdStatusSlotKeyTargetProcessID, target_pid_ptr);
+
+	is_processed_flag_ptr = (bool *) shm_toc_allocate(toc, sizeof(bool));
+	*is_processed_flag_ptr = false;
+	shm_toc_insert(toc, CmdStatusSlotKeyIsProcessedFlag, is_processed_flag_ptr);
+
+	request_type_ptr = (CmdStatusInfoRequestType *)
+		shm_toc_allocate(toc, sizeof(CmdStatusInfoRequestType));
+	*request_type_ptr = request_type;
+	shm_toc_insert(toc, CmdStatusSlotKeyRequestType, request_type_ptr);
+
+	result_code_ptr = (CmdStatusInfoResultCode *)
+		shm_toc_allocate(toc, sizeof(CmdStatusInfoResultCode));
+	*result_code_ptr = CMD_STATUS_RESULT_FAILURE;
+	shm_toc_insert(toc, CmdStatusSlotKeyResultCode, result_code_ptr);
+
+	mq = shm_mq_create(shm_toc_allocate(toc, CMD_STATUS_BUFFER_SIZE),
+					   CMD_STATUS_BUFFER_SIZE);
+	shm_toc_insert(toc, CmdStatusSlotKeyMemoryQueue, mq);
+
+	shm_mq_set_receiver(mq, MyProc);
+	shm_mq_set_sender(mq, target_proc);
+
+	PG_TRY();
+	{
+		shm_mq_result	res;
+		char		   *data = NULL;
+		Size			length;
+
+		input = shm_mq_attach(mq, seg, NULL);
+
+		*MyCmdStatusSlot = dsm_segment_handle(seg);
+
+		if (SendProcSignal(target_pid, PROCSIG_CMD_STATUS_INFO, target_proc->backendId))
+			elog(ERROR, "could not signal backend with PID %d", target_pid);
+
+		tmleft.tv_sec = timeout / 1000L;
+		tmleft.tv_usec = (timeout % 1000L) * 1000L;
+
+		for (;;)
+		{
+			/*
+			 * Read without blocking, otherwise cancel request or timeout
+			 * cannot be handled promptly.
+			 */
+			res = shm_mq_receive(input, &length, (void **) &data, true);
+			if (res == SHM_MQ_SUCCESS)
+			{
+				const char	   *p = data;
+
+				/* break into tuples on newline */
+				while (*p)
+				{
+					const char *q = strchr(p, '\n');
+					Size		len = q ? q - p : strlen(p);
+					Datum		value;
+					HeapTuple	tuple;
+					bool		isnull = false;
+
+					value = PointerGetDatum(cstring_to_text_with_len(p, len));
+
+					tuple = heap_form_tuple(tupdesc, &value, &isnull);
+					tuplestore_puttuple(tupstore, tuple);
+
+					if (!q)
+						break;
+
+					p += len + 1;
+				}
+			}
+			else if (res == SHM_MQ_DETACHED)
+			{
+				report_result_code(*result_code_ptr, target_pid);
+				break;
+			}
+			else if (res == SHM_MQ_WOULD_BLOCK)
+			{
+				int		waitres;
+				long	left_ms;
+
+				if (timeout)
+					gettimeofday(&tmthen, NULL);
+
+				/* Wait up to tmleft in ms (or 1 sec if timeout is not set). */
+				left_ms = timeout ? tmleft.tv_sec * 1000L + tmleft.tv_usec / 1000L : 1000L;
+
+				waitres = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, left_ms);
+				if (timeout && (waitres & WL_TIMEOUT) != 0)
+				{
+					elog(INFO, "timeout waiting for reply from backend with PID %d",
+						 target_pid);
+					break;
+				}
+
+				CHECK_FOR_INTERRUPTS();
+				if (waitres & WL_LATCH_SET)
+					ResetLatch(MyLatch);
+
+				if (timeout)
+				{
+					gettimeofday(&tmnow, NULL);
+
+					/* left = left + then - now */
+					timeradd(&tmleft, &tmthen, &tmtemp);
+					timersub(&tmtemp, &tmnow, &tmleft);
+
+					if (tmleft.tv_sec < 0)
+					{
+						tmleft.tv_sec = 0;
+						tmleft.tv_usec = 0;
+					}
+				}
+			}
+		}
+
+		/* clean up and return the tuplestore */
+		tuplestore_donestoring(tupstore);
+
+		*MyCmdStatusSlot = 0;
+
+		shm_mq_detach(mq);
+		dsm_detach(seg);
+	}
+	PG_CATCH();
+	{
+		*MyCmdStatusSlot = 0;
+
+		shm_mq_detach(mq);
+		dsm_detach(seg);
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	return (Datum) 0;
+}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 23e594e..2269853 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -29,6 +29,7 @@ ProtocolVersion FrontendProtocol;
 volatile bool InterruptPending = false;
 volatile bool QueryCancelPending = false;
 volatile bool ProcDiePending = false;
+volatile bool CmdStatusInfoPending = false;
 volatile bool ClientConnectionLost = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 7b19714..7e7ba77 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -50,6 +50,7 @@
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
+#include "utils/cmdstatus.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -588,6 +589,9 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 	/* Now that we have a BackendId, we can participate in ProcSignal */
 	ProcSignalInit(MyBackendId);
 
+	/* Init CmdStatus slot */
+	CmdStatusInit(MyBackendId);
+
 	/*
 	 * Also set up timeout handlers needed for backend operation.  We need
 	 * these in every case except bootstrap.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b3dac51..7cac9d3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -73,6 +73,7 @@
 #include "tsearch/ts_cache.h"
 #include "utils/builtins.h"
 #include "utils/bytea.h"
+#include "utils/cmdstatus.h"
 #include "utils/guc_tables.h"
 #include "utils/memutils.h"
 #include "utils/pg_locale.h"
@@ -2662,6 +2663,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"cmdstatus_interrupt_timeout", PGC_USERSET, STATS,
+			gettext_noop("Timeout for processing all pending command status requests."),
+			NULL,
+		 GUC_UNIT_MS /* min max */
+		},
+		&cmdstatus_interrupt_timeout,
+		CMD_STATUS_DEFAULT_INTERRUPT_TIMEOUT, 1, 1000,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index ddf7c67..5edbfd0 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3130,6 +3130,8 @@ DESCR("get OID of current session's temp schema, if any");
 DATA(insert OID = 2855 (  pg_is_other_temp_schema	PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ pg_is_other_temp_schema _null_ _null_ _null_ ));
 DESCR("is schema another session's temp schema?");
 
+DATA(insert OID = 4099 ( pg_cmdstatus		PGNSP PGUID 12 1 100 0 0 f f f f f t s 3 0 25 "23 23 23" _null_ _null_  _null_  _null_ _null_ pg_cmdstatus _null_ _null_ _null_ ));
+DESCR("returns information about another process");
 DATA(insert OID = 2171 ( pg_cancel_backend		PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_cancel_backend _null_ _null_ _null_ ));
 DESCR("cancel a server process' current query");
 DATA(insert OID = 2096 ( pg_terminate_backend		PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index e0cc69f..5f03f63 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -80,6 +80,7 @@
 extern PGDLLIMPORT volatile bool InterruptPending;
 extern PGDLLIMPORT volatile bool QueryCancelPending;
 extern PGDLLIMPORT volatile bool ProcDiePending;
+extern PGDLLIMPORT volatile bool CmdStatusInfoPending;
 
 extern volatile bool ClientConnectionLost;
 
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index af1a0cd..cd856e5 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,8 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
+	PROCSIG_CMD_STATUS_INFO,
+
 	NUM_PROCSIGNALS				/* Must be last! */
 } ProcSignalReason;
 
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index fc1679e..605612e 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1243,6 +1243,9 @@ extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS);
 /* catalog/objectaddress.c */
 extern Datum pg_get_object_address(PG_FUNCTION_ARGS);
 
+/* utils/adt/cmdstatus.c */
+extern Datum pg_cmdstatus(PG_FUNCTION_ARGS);
+
 /* commands/constraint.c */
 extern Datum unique_key_recheck(PG_FUNCTION_ARGS);
 
diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h
new file mode 100644
index 0000000..d6379f0
--- /dev/null
+++ b/src/include/utils/cmdstatus.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.h
+ *	  Declarations for command status interrupt handling.
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * src/include/utils/cmdstatus.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CMDSTATUS_H
+#define CMDSTATUS_H
+
+#define CMD_STATUS_DEFAULT_INTERRUPT_TIMEOUT		10	/* milliseconds */
+
+extern int cmdstatus_interrupt_timeout;
+
+extern Size CmdStatusShmemSize(void);
+extern void CmdStatusShmemInit(void);
+extern void CmdStatusInit(int css_idx);
+
+extern void HandleCmdStatusInfoInterrupt(void);
+extern void ProcessCmdStatusInfoRequest(void);
+
+#endif   /* CMDSTATUS_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to