On Fri, Feb 20, 2015 at 3:44 PM, Peter Geoghegan <p...@heroku.com> wrote:
>>> they'd only see a
>>> REORDER_BUFFER_CHANGE_INSERT when that was the definitive outcome of
>>> an UPSERT, or alternatively a REORDER_BUFFER_CHANGE_UPDATE when that
>>> was the definitive outcome. No need for output plugins to consider
>>> REORDER_BUFFER_CHANGE_INTERNAL_SUPERDELETE at all.
>>
>> Yes. It'd be easiest if the only the final insert/update were actually
>> WAL logged as full actions.
>
> Well, that implies that we'd actually know that we'd succeed when WAL
> logging the speculative heap tuple's insertion. We literally have no
> way of knowing if that's the case at that point, though - that's just
> the nature of value locking scheme #2's optimistic approach.

Attached patch does this. This is just a sketch, to provoke a quick
discussion (which is hopefully all this needs). I did things this way
because it seemed more efficient than rolling this into a whole new
revision of the ON CONFLICT patch series.

Ostensibly, this does the right thing: The test_decoding output plugin
only ever reports either an INSERT or an UPDATE within a transaction
(that contains an INSERT ... ON CONFLICT UPDATE). With jjanes_upsert
running, I have not found any problematic cases where this fails to be
true (although I have certainly been less than thorough so far -
please take that into consideration).

I have two basic concerns:

* Do you think that what I've sketched here is roughly the right
approach? Consolidating super deletions and insertions at transaction
reassembly seemed like the right thing to do, but obviously I'm not
best qualified to judge that. Clearly, it would be possible to
optimize this so REORDER_BUFFER_CHANGE_INSERT "peek ahead" happens
much more selectively, which I haven't bothered with yet.

* What are the hazards implied by my abuse of the
ReorderBufferIterTXNNext() interface? It looks like it should be okay
to "peek ahead" like this, because it has a slightly odd convention
around memory management that accidentally works. But honestly, I'm
feeling too lazy to make myself grok the code within
ReorderBufferIterTXNNext() at the moment, and so have not put concerns
about the code being totally broken to rest yet. Could the "current"
ReorderBufferChange be freed before it is sent to a logical decoding
plugin by way of a call to the  rb->apply_change() callback (when the
xact is spooled to disk, for example)?

Thanks
-- 
Peter Geoghegan
From 53784c818b39d39c6a1f3588f7823a9010074869 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <peter.geoghega...@gmail.com>
Date: Sun, 22 Feb 2015 17:02:14 -0800
Subject: [PATCH 7/7] Logical decoding support for ON CONFLICT UPDATE

---
 src/backend/replication/logical/decode.c        |  5 +++-
 src/backend/replication/logical/reorderbuffer.c | 35 +++++++++++++++++++++++--
 src/include/replication/reorderbuffer.h         |  3 ++-
 3 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e7614bd..a9cd0df 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -682,7 +682,10 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
-	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	if (!(xlrec->flags & XLOG_HEAP_KILLED_SPECULATIVE_TUPLE))
+		change->action = REORDER_BUFFER_CHANGE_DELETE;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_DELETE;
 
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 20bb3b7..8f9101e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -401,6 +401,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INSERT:
 		case REORDER_BUFFER_CHANGE_UPDATE:
 		case REORDER_BUFFER_CHANGE_DELETE:
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			if (change->data.tp.newtuple)
 			{
 				ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
@@ -1313,7 +1314,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
 	PG_TRY();
 	{
+		/*
+		 * To merge super delete and insert records, peek-ahead is occasionally
+		 * required
+		 */
 		ReorderBufferChange *change;
+		ReorderBufferChange *nextchange = NULL;
 
 		if (using_subtxn)
 			BeginInternalSubTransaction("replay");
@@ -1323,16 +1329,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = nextchange ? nextchange :
+				ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
 
+			/* Forget about previous peek ahead */
+			nextchange = NULL;
+
 			switch (change->action)
 			{
 				case REORDER_BUFFER_CHANGE_INSERT:
 				case REORDER_BUFFER_CHANGE_UPDATE:
 				case REORDER_BUFFER_CHANGE_DELETE:
+				case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 					Assert(snapshot_now);
 
 					reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
@@ -1374,7 +1385,23 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						else if (!IsToastRelation(relation))
 						{
 							ReorderBufferToastReplace(rb, txn, relation, change);
-							rb->apply_change(rb, txn, relation, change);
+
+							/*
+							 * Speculative insertion occasionally makes use of
+							 * "super deletion" -- an implementation defined
+							 * delete of a speculatively inserted tuple.
+							 * Neither the super deletion, nor the insertion
+							 * (which must be the prior record type) are
+							 * included in the final assembly.
+							 */
+							if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+								nextchange = ReorderBufferIterTXNNext(rb, iterstate);
+
+							if (change->action !=
+								REORDER_BUFFER_CHANGE_INTERNAL_DELETE &&
+								(!nextchange || nextchange->action !=
+								 REORDER_BUFFER_CHANGE_INTERNAL_DELETE))
+								rb->apply_change(rb, txn, relation, change);
 
 							/*
 							 * Only clear reassembled toast chunks if we're
@@ -2003,6 +2030,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			/* fall through */
 		case REORDER_BUFFER_CHANGE_DELETE:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			{
 				char	   *data;
 				ReorderBufferTupleBuf *oldtup,
@@ -2258,6 +2287,8 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			/* fall through */
 		case REORDER_BUFFER_CHANGE_DELETE:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			if (change->data.tp.newtuple)
 			{
 				Size		len = offsetof(ReorderBufferTupleBuf, t_data) +
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f1e0f57..c1a8819 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -51,7 +51,8 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_DELETE,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
-	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
+	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
+	REORDER_BUFFER_CHANGE_INTERNAL_DELETE
 };
 
 /*
-- 
1.9.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to