[ deferring responses to some points until a later time ]

On Thu, Feb 19, 2015 at 1:19 AM, Noah Misch <n...@leadboat.com> wrote:
>> This seems backwards to me.  If some hypothetical selectivity
>> estimator were PROPARALLEL_UNSAFE, then any operator that uses that
>> function would also need to be PROPARALLEL_UNSAFE.
>
> It's fair to have a PROPARALLEL_UNSAFE estimator for a PROPARALLEL_SAFE
> function, because the planning of a parallel query is often not itself done in
> parallel mode.  In that case, "SELECT * FROM tablename WHERE colname OP 0"
> might use a parallel seqscan but fail completely if called from inside a
> function running in parallel mode.  That is to say, an affected query can
> itself use parallelism, but placing the query in a function makes the function
> PROPARALLEL_UNSAFE.  Surprising, but not wrong.
>
> Rereading my previous message, I failed to make the bottom line clear: I
> recommend marking eqsel etc. PROPARALLEL_UNSAFE but *not* checking an
> estimator's proparallel before calling it in the planner.

But what do these functions do that is actually unsafe?

>> > - Assuming you don't want to propagate XactLastRecEnd from the slave back 
>> > to
>> >   the master, restrict XLogInsert() during parallel mode.  Restrict 
>> > functions
>> >   that call it, including pg_create_restore_point, pg_switch_xlog and
>> >   pg_stop_backup.
>>
>> Hmm.  Altogether prohibiting XLogInsert() in parallel mode seems
>> unwise, because it would foreclose heap_page_prune_opt() in workers.
>> I realize there's separate conversation about whether pruning during
>> SELECT queries is good policy, but in the interested of separating
>> mechanism from policy, and in the sure knowledge that allowing at
>> least some writes in parallel mode is certainly going to be something
>> people will want, it seems better to look into propagating
>> XactLastRecEnd.
>
> Good points; that works for me.

The key design decision here seems to be this: How eagerly do we need
to synchronize XactLastRecEnd?  What exactly goes wrong if it isn't
synchronized?  For example, if the value were absolutely critical in
all circumstances, one could imagine storing a shared XactLastRecEnd
in shared memory.  This doesn't appear to be the case: the main
requirement is that we definitely need an up-to-date value at commit
time.  Also, at abort time, we don't really the value for anything
critical, but it's worth kicking the WAL writer so that any
accumulated WAL gets flushed.

Here's an incremental patch - which I will incorporate into the
parallel mode patch if it seems about right to you - that tries to
tackle all this.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
commit 26bd8f366415906a67abd49824fe3a980d5d4555
Author: Robert Haas <rh...@postgresql.org>
Date:   Thu Mar 12 11:09:19 2015 -0400

    Synchronize XactLastRecEnd.

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 225ec89..a97c899 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -15,6 +15,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "access/parallel.h"
 #include "commands/async.h"
 #include "libpq/libpq.h"
@@ -73,10 +74,15 @@ typedef struct FixedParallelState
 	/* Entrypoint for parallel workers. */
 	parallel_worker_main_type	entrypoint;
 
-	/* Track whether workers have attached. */
+	/* Mutex protects remaining fiedlds. */
 	slock_t		mutex;
+
+	/* Track whether workers have attached. */
 	int			workers_expected;
 	int			workers_attached;
+
+	/* Maximum XactLastRecEnd of any worker. */
+	XLogRecPtr	last_xlog_end;
 } FixedParallelState;
 
 /*
@@ -90,6 +96,9 @@ int ParallelWorkerNumber = -1;
 /* Is there a parallel message pending which we need to receive? */
 bool ParallelMessagePending = false;
 
+/* Pointer to our fixed parallel state. */
+static FixedParallelState *MyFixedParallelState;
+
 /* List of active parallel contexts. */
 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
 
@@ -257,6 +266,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 	SpinLockInit(&fps->mutex);
 	fps->workers_expected = pcxt->nworkers;
 	fps->workers_attached = 0;
+	fps->last_xlog_end = 0;
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
 
 	/* Serialize GUC state to dynamic shared memory. */
@@ -398,6 +408,9 @@ LaunchParallelWorkers(ParallelContext *pcxt)
  * important to call this function afterwards.  We must not miss any errors
  * the workers may have thrown during the parallel operation, or any that they
  * may yet throw while shutting down.
+ *
+ * Also, we want to update our notion of XactLastRecEnd based on worker
+ * feedback.
  */
 void
 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
@@ -431,6 +444,15 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
 		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
 		ResetLatch(&MyProc->procLatch);
 	}
+
+	if (pcxt->toc != NULL)
+	{
+		FixedParallelState *fps;
+
+		fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+		if (fps->last_xlog_end > XactLastRecEnd)
+			XactLastRecEnd = fps->last_xlog_end;
+	}
 }
 
 /*
@@ -777,6 +799,7 @@ ParallelWorkerMain(Datum main_arg)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("too many parallel workers already attached")));
+	MyFixedParallelState = fps;
 
 	/*
 	 * Now that we have a worker number, we can find and attach to the error
@@ -921,3 +944,19 @@ ParallelErrorContext(void *arg)
 {
 	errcontext("parallel worker, pid %d", * (int32 *) arg);
 }
+
+/*
+ * Update shared memory with the ending location of the last WAL record we
+ * wrote, if it's greater than the value already stored there.
+ */
+void
+ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
+{
+	FixedParallelState *fps = MyFixedParallelState;
+
+	Assert(fps != NULL);
+	SpinLockAcquire(&fps->mutex);
+	if (fps->last_xlog_end < last_xlog_end)
+		fps->last_xlog_end = last_xlog_end;
+	SpinLockRelease(&fps->mutex);
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 7b3a691..e192f57 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2066,14 +2066,28 @@ CommitTransaction(void)
 	 */
 	s->state = TRANS_COMMIT;
 
-	/*
-	 * Unless we're in parallel mode, we need to mark our XIDs as committed
-	 * in pg_clog.  This is where durably commit.
-	 */
-	if (parallel)
-		latestXid = InvalidTransactionId;
-	else
+	if (!parallel)
+	{
+		/*
+		 * We need to mark our XIDs as commited in pg_clog.  This is where we
+		 * durably commit.
+		 */
 		latestXid = RecordTransactionCommit();
+	}
+	else
+	{
+		/*
+		 * We must not mark our XID committed; the parallel master is
+		 * responsible for that.
+		 */
+		latestXid = InvalidTransactionId;
+
+		/*
+		 * Make sure the master will know about any WAL we wrote before it
+		 * commits.
+		 */
+		ParallelWorkerReportLastRecEnd(XactLastRecEnd);
+	}
 
 	TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
 
@@ -2564,10 +2578,19 @@ AbortTransaction(void)
 	 * worker, skip this; the user backend must be the one to write the abort
 	 * record.
 	 */
-	if (parallel)
-		latestXid = InvalidTransactionId;
-	else
+	if (!parallel)
 		latestXid = RecordTransactionAbort(false);
+	else
+	{
+		latestXid = InvalidTransactionId;
+
+		/*
+		 * Since the parallel master won't get our value of XactLastRecEnd in this
+		 * case, we nudge WAL-writer ourselves in this case.  See related comments in
+		 * RecordTransactionAbort for why this matters.
+		 */
+		XLogSetAsyncXactLSN(XactLastRecEnd);
+	}
 
 	TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
 
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 0685e64..9c68907 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -14,6 +14,7 @@
 #ifndef PARALLEL_H
 #define PARALLEL_H
 
+#include "access/xlogdefs.h"
 #include "lib/ilist.h"
 #include "postmaster/bgworker.h"
 #include "storage/shm_mq.h"
@@ -59,5 +60,6 @@ extern void HandleParallelMessageInterrupt(void);
 extern void HandleParallelMessages(void);
 extern void AtEOXact_Parallel(bool isCommit);
 extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
+extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
 
 #endif   /* PARALLEL_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to