On 4/2/22 13:51, Tomas Vondra wrote:
> 
> 
> On 4/2/22 12:35, Amit Kapila wrote:
>> On Fri, Apr 1, 2022 at 8:32 PM Tomas Vondra
>> <tomas.von...@enterprisedb.com> wrote:
>>>
>>> On 3/28/22 07:29, Amit Kapila wrote:
>>>> I thought about changing snapshot dealing of
>>>> non-transactional sequence changes similar to transactional ones but
>>>> that also won't work because it is only at commit we decide whether we
>>>> can send the changes.
>>>>
>>> I wonder if there's some earlier LSN (similar to the consistent point)
>>> which might be useful for this.
>>>
>>> Or maybe we should queue even the non-transactional changes, not
>>> per-transaction but in a global list, and then at each commit either
>>> discard inspect them (at that point we know the lowest LSN for all
>>> transactions and the consistent point). Seems complex, though.
>>>
>>
>> I couldn't follow '..discard inspect them ..'. Do you mean we inspect
>> them and discard whichever are not required? It seems here we are
>> talking about a new global ReorderBufferGlobal instead of
>> ReorderBufferTXN to collect these changes but we don't need only
>> consistent point LSN because we do send if the commit of containing
>> transaction is after consistent point LSN, so we need some transaction
>> information as well. I think it could bring new challenges.
>>
> 
> Sorry for the gibberish. Yes, I meant to discard sequence changes that
> are no longer needed, due to being "obsoleted" by the applied change. We
> must not apply "older" changes (using LSN) because that would make the
> sequence go backwards.
> 
> I'm not entirely sure whether the list of changes should be kept in TXN
> or in the global reorderbuffer object - we need to track which TXN the
> change belongs to (because of transactional changes) but we also need to
> discard the unnecessary changes efficiently (and walking TXN might be
> expensive).
> 
> But yes, I'm sure there will be challenges. One being that tracking just
> the decoded WAL stuff is not enough, because nextval() may not generate
> WAL. But we still need to make sure the increment is replicated.
> 
> What I think we might do is this:
> 
> - add a global list of decoded sequence increments to ReorderBuffer
> 
> - at each commit/abort walk the list, walk the list and consider all
> increments up to the commit LSN that "match" (non-transactional match
> all TXNs, transactional match only the current TXN)
> 
> - replicate the last "matching" status for each sequence, discard the
> processed ones
> 
> We could probably optimize this by not tracking every single increment,
> but merge them "per transaction", I think.
> 
> I'm sure this description is pretty rough and will need refining, handle
> various corner-cases etc.
> 
I did some experiments over the weekend, exploring how to rework the
sequence decoding in various ways. Let me share some WIP patches,
hopefully that can be useful for trying more stuff and moving this
discussion forward.

I tried two things - (1) accumulating sequence increments in global
array and then doing something with it, and (2) treating all sequence
increments as regular changes (in a TXN) and then doing something
special during the replay. Attached are two patchsets, one for each
approach.

Note: It's important to remember decoding of sequences is not the only
code affected by this. The logical messages have the same issue,
certainly when it comes to transactional vs. non-transactional stuff and
handling of snapshots. Even if the sequence decoding ends up being
reverted, we still need to fix that, somehow. And my feeling is the
solutions ought to be pretty similar in both cases.

Now, regarding the two approaches:

(1) accumulating sequences in global hash table

The main problem with regular sequence increments is that those need to
be non-transactional - a transaction may use a sequence without any
WAL-logging, if the WAL was written by an earlier transaction. The
problem is the earlier trasaction might have been rolled back, and thus
simply discarded by the logical decoding. But we still need to apply
that, in order not to lose the sequence increment.

The current code just applies those non-transactional increments right
after decoding the increment, but that does not work because we may not
have a snapshot at that point. And we only have the snapshot when within
a transaction (AFAICS) so this queues all changes and then applies the
changes later.

The changes need to be shared by all transactions, so queueing them in a
global works fairly well - otherwise we'd have to walk all transactions,
in order to see if there are relevant sequence increments.

But some increments may be transactional, e.g. when the sequence is
created or altered in a transaction. To allow tracking this, this uses a
hash table, with relfilenode as a key.

There's a couple issues with this, though. Firstly, stashing the changes
outside transactions, it's not included in memory accounting, it's not
spilled to disk or streamed, etc. I guess fixing this is possible, but
it's certainly not straightforward, because we mix increments from many
different transactions.

A bigger issue is that I'm not sure this actually handles the snapshots
correctly either.

The non-transactional increments affect all transactions, so when
ReorderBufferProcessSequences gets executed, it processes all of them,
no matter the source transaction. Can we be sure the snapshot in the
applying transaction is the same (or "compatible") as the snapshot in
the source transaction?

Transactional increments can be simply processed as regular changes, of
course, but one difference is that we always create the transaction
(while before we just triggered the apply callback). This is necessary
as now we drive all of this from ReorderBufferCommit(), and without the
transaction the increment not be applied / there would be no snapshot.

It does seem to work, though, although I haven't tested it much so far.
One annoying bit is that we have to always walk all sequences and
increments, for each change in the transaction. Which seems quite
expensive, although the number of in-progress increments should be
pretty low (roughly equal to the number of sequences). Or at least the
part we need to consider for a single change (i.e. between two LSNs).

So maybe this should work. The one part this does not handle at all are
aborted transactions. At the moment we just discard those, which means
(a) we fail to discard the transactional changes from the hash table,
and (b) we can't apply the non-transactional changes, because with the
changes we also discard the snapshots we need.

I wonder if we could use a different snapshot, though. Non-transactional
changes can't change the relfilenode, after all. Not sure. If not, the
only solution I can think of is processing even aborted transactions,
but skipping changes except those that update snapshots.

There's a serious problem with streaming, though - we don't know which
transaction will commit first, hence we can't decide whether to send the
sequence changes. This seems pretty fatal to me. So we'd have to stream
the sequence changes only at commit, and then do some of this work on
the worker (i.e. merge the sequence changes to the right place). That
seems pretty annoying.


(2) treating sequence change as regular changes

This adopts a different approach - instead of accumulating the sequence
increments in a global hash table, it treats them as regular changes.
Which solves the snapshot issue, and issues with spilling to disk,
streaming and so on.

But it has various other issues with handling concurrent transactions,
unfortunately, which probably make this approach infeasible:

* The non-transactional stuff has to be applied in the first transaction
that commits, not in the transaction that generated the WAL. That does
not work too well with this approach, because we have to walk changes in
all other transactions.

* Another serious issue seems to be streaming - if we already streamed
some of the changes, we can't iterate through them anymore.

Also, having to walk the transactions over and over for each change, to
apply relevant sequence increments, that's mighty expensive. The other
approach needs to do that too, but walking the global hash table seems
much cheaper.

The other issue this handling of aborted transactions - we need to apply
sequence increments even from those transactions, of course. The other
approach has this issue too, though.


(3) tracking sequences touched by transaction

This is the approach proposed by Hannu Krosing. I haven't explored this
again yet, but I recall I wrote a PoC patch a couple months back.

It seems to me most of the problems stems from trying to derive sequence
state from decoded WAL changes, which is problematic because of the
non-transactional nature of sequences (i.e. WAL for one transaction
affects other transactions in non-obvious ways). And this approach
simply works around that entirely - instead of trying to deduce the
sequence state from WAL, we'd make sure to write the current sequence
state (or maybe just ID of the sequence) at commit time. Which should
eliminate most of the complexity / problems, I think.


I'm not really sure what to do about this. All of those reworks seems
like an extensive redesign of the patch, and considering the last CF is
already over ... not great.

However, even if we end up reverting this, we'll still have the same
problem with snapshots for logical messages.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 1c81414bc3b118b8b26489ec8731e8590e3b4806 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Sat, 2 Apr 2022 20:44:15 +0200
Subject: [PATCH vglobal 1/2] WIP: sequence reworks

---
 src/backend/replication/logical/decode.c      |  34 +-
 .../replication/logical/reorderbuffer.c       | 475 +++++++-----------
 src/include/replication/reorderbuffer.h       |  36 +-
 3 files changed, 223 insertions(+), 322 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..77db984f594 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1313,9 +1313,7 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	TransactionId xid = XLogRecGetXid(r);
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
 	xl_seq_rec *xlrec;
-	Snapshot	snapshot;
 	RepOriginId origin_id = XLogRecGetOrigin(r);
-	bool		transactional;
 
 	/* only decode changes flagged with XLOG_SEQ_LOG */
 	if (info != XLOG_SEQ_LOG)
@@ -1351,33 +1349,15 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if(!datalen || !tupledata)
 		return;
 
+	/* sets the snapshot etc. */
+	if (!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+
 	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
 	DecodeSeqTuple(tupledata, datalen, tuplebuf);
 
-	/*
-	 * Should we handle the sequence increment as transactional or not?
-	 *
-	 * If the sequence was created in a still-running transaction, treat
-	 * it as transactional and queue the increments. Otherwise it needs
-	 * to be treated as non-transactional, in which case we send it to
-	 * the plugin right away.
-	 */
-	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
-														 target_node,
-														 xlrec->created);
-
-	/* Skip the change if already processed (per the snapshot). */
-	if (transactional &&
-		!SnapBuildProcessChange(builder, xid, buf->origptr))
-		return;
-	else if (!transactional &&
-			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
-			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
-		return;
-
-	/* Queue the increment (or send immediately if not transactional). */
-	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
-	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
-							   origin_id, target_node, transactional,
+	/* queue the sequence increment */
+	ReorderBufferQueueSequence(ctx->reorder, xid, buf->endptr,
+							   origin_id, target_node,
 							   xlrec->created, tuplebuf);
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fae..3113076a690 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -156,6 +156,8 @@ typedef struct ReorderBufferSequenceEnt
 {
 	RelFileNode		rnode;
 	TransactionId	xid;
+	bool			transactional;
+	dlist_head		changes;
 } ReorderBufferSequenceEnt;
 
 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
@@ -365,6 +367,11 @@ ReorderBufferAllocate(void)
 											   SLAB_DEFAULT_BLOCK_SIZE,
 											   sizeof(ReorderBufferChange));
 
+	buffer->sequence_context = SlabContextCreate(new_ctx,
+												 "Sequence",
+												 SLAB_DEFAULT_BLOCK_SIZE,
+												 sizeof(ReorderBufferSequenceChange));
+
 	buffer->txn_context = SlabContextCreate(new_ctx,
 											"TXN",
 											SLAB_DEFAULT_BLOCK_SIZE,
@@ -516,6 +523,21 @@ ReorderBufferGetChange(ReorderBuffer *rb)
 	return change;
 }
 
+/*
+ * Get a fresh ReorderBufferSequenceChange.
+ */
+static ReorderBufferSequenceChange *
+ReorderBufferGetSequenceChange(ReorderBuffer *rb)
+{
+	ReorderBufferSequenceChange *change;
+
+	change = (ReorderBufferSequenceChange *)
+		MemoryContextAlloc(rb->sequence_context, sizeof(ReorderBufferSequenceChange));
+
+	memset(change, 0, sizeof(ReorderBufferSequenceChange));
+	return change;
+}
+
 /*
  * Free a ReorderBufferChange and update memory accounting, if requested.
  */
@@ -575,13 +597,6 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				change->data.truncate.relids = NULL;
 			}
 			break;
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			if (change->data.sequence.tuple)
-			{
-				ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
-				change->data.sequence.tuple = NULL;
-			}
-			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -923,48 +938,26 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
  * so we simply do a lookup (the sequence is identified by relfilende). If
  * we find a match, the increment should be handled as transactional.
  */
-bool
+static bool
 ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
 									 RelFileNode rnode, bool created)
 {
 	bool	found = false;
+	ReorderBufferSequenceEnt *ent;
 
 	if (created)
 		return true;
 
-	hash_search(rb->sequences,
-				(void *) &rnode,
-				HASH_FIND,
-				&found);
-
-	return found;
-}
+	ent = hash_search(rb->sequences,
+					  (void *) &rnode,
+					  HASH_FIND,
+					  &found);
 
-/*
- * Cleanup sequences created in in-progress transactions.
- *
- * There's no way to search by XID, so we simply do a seqscan of all
- * the entries in the hash table. Hopefully there are only a couple
- * entries in most cases - people generally don't create many new
- * sequences over and over.
- */
-static void
-ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
-{
-	HASH_SEQ_STATUS scan_status;
-	ReorderBufferSequenceEnt *ent;
-
-	hash_seq_init(&scan_status, rb->sequences);
-	while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
-	{
-		/* skip sequences not from this transaction */
-		if (ent->xid != xid)
-			continue;
+	/* if not found, it's definitely non-transactional */
+	if (!found)
+		return false;
 
-		(void) hash_search(rb->sequences,
-					   (void *) &(ent->rnode),
-					   HASH_REMOVE, NULL);
-	}
+	return ent->transactional;
 }
 
 /*
@@ -978,166 +971,97 @@ ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
  */
 void
 ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
-						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-						   RelFileNode rnode, bool transactional, bool created,
+						   XLogRecPtr lsn, RepOriginId origin_id,
+						   RelFileNode rnode, bool created,
 						   ReorderBufferTupleBuf *tuplebuf)
 {
-	/*
-	 * Change needs to be handled as transactional, because the sequence was
-	 * created in a transaction that is still running. In that case all the
-	 * changes need to be queued in that transaction, we must not send them
-	 * to the downstream until the transaction commits.
-	 *
-	 * There's a bit of a trouble with subtransactions - we can't queue it
-	 * into the subxact, because it might be rolled back and we'd lose the
-	 * increment. We need to queue it into the same (sub)xact that created
-	 * the sequence, which is why we track the XID in the hash table.
-	 */
-	if (transactional)
-	{
-		MemoryContext oldcontext;
-		ReorderBufferChange *change;
-
-		/* lookup sequence by relfilenode */
-		ReorderBufferSequenceEnt   *ent;
-		bool						found;
-
-		/* transactional changes require a transaction */
-		Assert(xid != InvalidTransactionId);
-
-		/* search the lookup table (we ignore the return value, found is enough) */
-		ent = hash_search(rb->sequences,
-						  (void *) &rnode,
-						  created ? HASH_ENTER : HASH_FIND,
-						  &found);
-
-		/*
-		 * If this is the "create" increment, we must not have found any
-		 * pre-existing entry in the hash table (i.e. there must not be
-		 * any conflicting sequence).
-		 */
-		Assert(!(created && found));
-
-		/* But we must have either created or found an existing entry. */
-		Assert(created || found);
-
-		/*
-		 * When creating the sequence, remember the XID of the transaction
-		 * that created id.
-		 */
-		if (created)
-			ent->xid = xid;
+	bool transactional;
+	MemoryContext oldcontext;
+	ReorderBufferSequenceChange *change;
 
-		/* XXX Maybe check that we're still in the same top-level xact? */
+	/* lookup sequence by relfilenode */
+	ReorderBufferSequenceEnt   *ent;
+	bool						found;
 
-		/* OK, allocate and queue the change */
-		oldcontext = MemoryContextSwitchTo(rb->context);
+	transactional = ReorderBufferSequenceIsTransactional(rb, rnode, created);
 
-		change = ReorderBufferGetChange(rb);
+	/*
+	 * Maintenance of the sequences hash, so that we can distinguish which
+	 * additional increments are transactional and non-transactional.
+	 */
 
-		change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
-		change->origin_id = origin_id;
+	/* transactional changes require a transaction */
+	Assert(!(transactional && (xid == InvalidTransactionId)));
 
-		memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+	/* search the lookup table */
+	ent = hash_search(rb->sequences,
+					  (void *) &rnode,
+					  HASH_ENTER,
+					  &found);
 
-		change->data.sequence.tuple = tuplebuf;
+	/*
+	 * If this is the "create" increment, we must not have found any entry in
+	 * the hash table (i.e. there must not be any conflicting sequence with the
+	 * same relfilenode).
+	 */
+	Assert(!(created && found));
 
-		/* add it to the same subxact that created the sequence */
-		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+	/*
+	 * It's either non-transactioanl change, creation of a relfilenode, or we
+	 * should have found an existing entry.
+	 */
+	Assert(!transactional || created || found);
 
-		MemoryContextSwitchTo(oldcontext);
-	}
-	else
+	if (!found)
 	{
-		/*
-		 * This increment is for a sequence that was not created in any
-		 * running transaction, so we treat it as non-transactional and
-		 * just send it to the output plugin directly.
-		 */
-		ReorderBufferTXN *txn = NULL;
-		volatile Snapshot snapshot_now = snapshot;
-		bool	using_subtxn;
-
-#ifdef USE_ASSERT_CHECKING
-		/* All "creates" have to be handled as transactional. */
-		Assert(!created);
-
-		/* Make sure the sequence is not in the hash table. */
-		{
-			bool	found;
-			hash_search(rb->sequences,
-						(void *) &rnode,
-						HASH_FIND, &found);
-			Assert(!found);
-		}
-#endif
-
-		if (xid != InvalidTransactionId)
-			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
-
-		/* setup snapshot to allow catalog access */
-		SetupHistoricSnapshot(snapshot_now, NULL);
-
-		/*
-		 * Decoding needs access to syscaches et al., which in turn use
-		 * heavyweight locks and such. Thus we need to have enough state around to
-		 * keep track of those.  The easiest way is to simply use a transaction
-		 * internally.  That also allows us to easily enforce that nothing writes
-		 * to the database by checking for xid assignments.
-		 *
-		 * When we're called via the SQL SRF there's already a transaction
-		 * started, so start an explicit subtransaction there.
-		 */
-		using_subtxn = IsTransactionOrTransactionBlock();
-
-		PG_TRY();
-		{
-			Relation	relation;
-			HeapTuple	tuple;
-			Form_pg_sequence_data seq;
-			Oid			reloid;
-
-			if (using_subtxn)
-				BeginInternalSubTransaction("sequence");
-			else
-				StartTransactionCommand();
-
-			reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+		ent->transactional = transactional;
+		dlist_init(&ent->changes);
+	}
 
-			if (reloid == InvalidOid)
-				elog(ERROR, "could not map filenode \"%s\" to relation OID",
-					 relpathperm(rnode,
-								 MAIN_FORKNUM));
+	/*
+	 * When creating the sequence, remember the XID of the transaction
+	 * that created id.
+	 */
+	if (created)
+		ent->xid = xid;
 
-			relation = RelationIdGetRelation(reloid);
-			tuple = &tuplebuf->tuple;
-			seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+	/*
+	 * Queue the sequence change, into a global list.
+	 *
+	 * There's a bit of a trouble with subtransactions - we can't queue it
+	 * into the subxact, because it might be rolled back and we'd lose the
+	 * increment. We need to queue it into the same (sub)xact that created
+	 * the sequence, which is why we track the XID in the hash table.
+	 */
+	oldcontext = MemoryContextSwitchTo(rb->context);
 
-			rb->sequence(rb, txn, lsn, relation, transactional,
-						 seq->last_value, seq->log_cnt, seq->is_called);
+	change = ReorderBufferGetSequenceChange(rb);
 
-			RelationClose(relation);
+	change->origin_id = origin_id;
 
-			TeardownHistoricSnapshot(false);
+	memcpy(&change->relnode, &rnode, sizeof(RelFileNode));
 
-			AbortCurrentTransaction();
+	change->tuple = tuplebuf;
+	change->transactional = transactional;
 
-			if (using_subtxn)
-				RollbackAndReleaseCurrentSubTransaction();
-		}
-		PG_CATCH();
-		{
-			TeardownHistoricSnapshot(true);
+	Assert(InvalidXLogRecPtr != lsn);
+	change->lsn = lsn;
 
-			AbortCurrentTransaction();
+	if (xid != InvalidTransactionId)
+		change->txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
-			if (using_subtxn)
-				RollbackAndReleaseCurrentSubTransaction();
+	/*
+	 * FIXME What to do about concurrent_abort for the transaction? Maybe
+	 * should depend on transactional vs. non-transactional change.
+	 *
+	 * FIXME Also what to do about memory accounting? If not treated as
+	 * regular changes, it's not part of memory accounting.
+	 */
+	/* add it to the list in the global hash table (to the tail, to keep
+	 * the right ordering of changes */
+	dlist_push_tail(&ent->changes, &change->node);
 
-			PG_RE_THROW();
-		}
-		PG_END_TRY();
-	}
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -1816,9 +1740,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 				&found);
 	Assert(found);
 
-	/* Remove sequences created in this transaction (if any). */
-	ReorderBufferSequenceCleanup(rb, txn->xid);
-
 	/* remove entries spilled to disk */
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
@@ -2239,21 +2160,21 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
  */
 static inline void
 ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
-						   Relation relation, ReorderBufferChange *change,
+						   Relation relation, ReorderBufferSequenceChange *change,
 						   bool streaming)
 {
 	HeapTuple	tuple;
 	Form_pg_sequence_data seq;
 
-	tuple = &change->data.sequence.tuple->tuple;
+	tuple = &change->tuple->tuple;
 	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
 
 	/* Only ever called from ReorderBufferApplySequence, so transational. */
 	if (streaming)
-		rb->stream_sequence(rb, txn, change->lsn, relation, true,
+		rb->stream_sequence(rb, txn, change->lsn, relation, change->transactional,
 							seq->last_value, seq->log_cnt, seq->is_called);
 	else
-		rb->sequence(rb, txn, change->lsn, relation, true,
+		rb->sequence(rb, txn, change->lsn, relation, change->transactional,
 					 seq->last_value, seq->log_cnt, seq->is_called);
 }
 
@@ -2313,6 +2234,83 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	}
 }
 
+/*
+ * Walk all the accumulated sequence increments, and apply all changes between
+ * the start/end LSNs (e.g. between two decoded changes).
+ */
+static void
+ReorderBufferProcessSequences(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							  Snapshot snapshot,
+							  XLogRecPtr start_lsn, XLogRecPtr end_lsn,
+							  bool streaming)
+{
+	HASH_SEQ_STATUS scan_status;
+	ReorderBufferSequenceEnt *ent;
+
+	/* walk changes for all sequences, apply relevant ones */
+	hash_seq_init(&scan_status, rb->sequences);
+	while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+	{
+		dlist_mutable_iter iter;
+
+		/*
+		 * Walk changes for the sequence, apply those that are non-transactional
+		 * ones or transactional for this XID. Stop on the first transactional
+		 * one for a different XID (XXX that should get a different relnode, so
+		 * not sure it can even happen mid-list).
+		 */
+		dlist_foreach_modify(iter, &ent->changes)
+		{
+			Oid			reloid;
+			Relation	relation;
+			ReorderBufferSequenceChange *change;
+
+			change = dlist_container(ReorderBufferSequenceChange, node, iter.cur);
+
+			/*
+			 * ignore past/future changes
+			 *
+			 * FIXME not sure this is really correct
+			 */
+			if (change->lsn < start_lsn || change->lsn > end_lsn)
+				break;
+
+			/* stop if transactional for a different XID */
+			if (ent->transactional && change->txn != txn)
+				break;
+
+			/* gonna apply the change, so remove from list */
+			dlist_delete(&change->node);
+
+			Assert(snapshot);
+
+			reloid = RelidByRelfilenode(change->relnode.spcNode,
+										change->relnode.relNode);
+
+			if (reloid == InvalidOid)
+				elog(ERROR, "could not map filenode \"%s\" to relation OID",
+					 relpathperm(change->relnode, MAIN_FORKNUM));
+
+			relation = RelationIdGetRelation(reloid);
+
+			if (!RelationIsValid(relation))
+				elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+					 reloid,
+					 relpathperm(change->relnode, MAIN_FORKNUM));
+
+			if (RelationIsLogicallyLogged(relation))
+				ReorderBufferApplySequence(rb, change->txn, relation, change, streaming);
+
+			RelationClose(relation);
+		}
+
+		if (dlist_is_empty(&ent->changes))
+			(void) hash_search(rb->sequences,
+							   (void *) &(ent->rnode),
+							   HASH_REMOVE, NULL);
+	}
+}
+
 /*
  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
  *
@@ -2419,6 +2417,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				SetupCheckXidLive(curtxn->xid);
 			}
 
+			/*
+			 * Emit sequences up to this change (might change snapshot, and
+			 * we need to do lookups by relfilenode)
+			 */
+			ReorderBufferProcessSequences(rb, txn, snapshot_now,
+										  txn->first_lsn, change->lsn, streaming);
+
 			switch (change->action)
 			{
 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -2700,30 +2705,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 
-				case REORDER_BUFFER_CHANGE_SEQUENCE:
-					Assert(snapshot_now);
-
-					reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
-												change->data.sequence.relnode.relNode);
-
-					if (reloid == InvalidOid)
-						elog(ERROR, "could not map filenode \"%s\" to relation OID",
-							 relpathperm(change->data.sequence.relnode,
-										 MAIN_FORKNUM));
-
-					relation = RelationIdGetRelation(reloid);
-
-					if (!RelationIsValid(relation))
-						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
-							 reloid,
-							 relpathperm(change->data.sequence.relnode,
-										 MAIN_FORKNUM));
-
-					if (RelationIsLogicallyLogged(relation))
-						ReorderBufferApplySequence(rb, txn, relation, change, streaming);
-
-					RelationClose(relation);
-					break;
 			}
 		}
 
@@ -2734,6 +2715,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
+		/* Replicate relevant changes for sequences. */
+		ReorderBufferProcessSequences(rb, txn, snapshot_now,
+									  txn->first_lsn, commit_lsn, streaming);
+
 		/*
 		 * Update total transaction count and total bytes processed by the
 		 * transaction and its subtransactions. Ensure to not count the
@@ -4108,39 +4093,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				memcpy(data, change->data.truncate.relids, size);
 				data += size;
 
-				break;
-			}
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			{
-				char	   *data;
-				ReorderBufferTupleBuf *tup;
-				Size		len = 0;
-
-				tup = change->data.sequence.tuple;
-
-				if (tup)
-				{
-					sz += sizeof(HeapTupleData);
-					len = tup->tuple.t_len;
-					sz += len;
-				}
-
-				/* make sure we have enough space */
-				ReorderBufferSerializeReserve(rb, sz);
-
-				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
-				/* might have been reallocated above */
-				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-				if (len)
-				{
-					memcpy(data, &tup->tuple, sizeof(HeapTupleData));
-					data += sizeof(HeapTupleData);
-
-					memcpy(data, tup->tuple.t_data, len);
-					data += len;
-				}
-
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4405,22 +4357,6 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 			{
 				sz += sizeof(Oid) * change->data.truncate.nrelids;
 
-				break;
-			}
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			{
-				ReorderBufferTupleBuf *tup;
-				Size		len = 0;
-
-				tup = change->data.sequence.tuple;
-
-				if (tup)
-				{
-					sz += sizeof(HeapTupleData);
-					len = tup->tuple.t_len;
-					sz += len;
-				}
-
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4723,29 +4659,6 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				break;
 			}
 
-		case REORDER_BUFFER_CHANGE_SEQUENCE:
-			if (change->data.sequence.tuple)
-			{
-				uint32		tuplelen = ((HeapTuple) data)->t_len;
-
-				change->data.sequence.tuple =
-					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
-
-				/* restore ->tuple */
-				memcpy(&change->data.sequence.tuple->tuple, data,
-					   sizeof(HeapTupleData));
-				data += sizeof(HeapTupleData);
-
-				/* reset t_data pointer into the new tuplebuf */
-				change->data.sequence.tuple->tuple.t_data =
-					ReorderBufferTupleBufData(change->data.sequence.tuple);
-
-				/* restore tuple data itself */
-				memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
-				data += tuplelen;
-			}
-			break;
-
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0bcc150b331..d94bce8d39f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -64,8 +64,7 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
-	REORDER_BUFFER_CHANGE_TRUNCATE,
-	REORDER_BUFFER_CHANGE_SEQUENCE
+	REORDER_BUFFER_CHANGE_TRUNCATE
 } ReorderBufferChangeType;
 
 /* forward declaration */
@@ -159,13 +158,6 @@ typedef struct ReorderBufferChange
 			uint32		ninvalidations; /* Number of messages */
 			SharedInvalidationMessage *invalidations;	/* invalidation message */
 		}			inval;
-
-		/* Context data for Sequence changes */
-		struct
-		{
-			RelFileNode relnode;
-			ReorderBufferTupleBuf *tuple;
-		}			sequence;
 	}			data;
 
 	/*
@@ -175,6 +167,24 @@ typedef struct ReorderBufferChange
 	dlist_node	node;
 } ReorderBufferChange;
 
+typedef struct ReorderBufferSequenceChange
+{
+	XLogRecPtr	lsn;
+
+	/* Transaction this change belongs to. */
+	struct ReorderBufferTXN *txn;
+
+	RepOriginId origin_id;
+
+	/* sequence data */
+	bool	transactional;
+	RelFileNode relnode;
+	ReorderBufferTupleBuf *tuple;
+
+	/* list of sequence changes */
+	dlist_node	node;
+} ReorderBufferSequenceChange;
+
 /* ReorderBufferTXN txn_flags */
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT          0x0002
@@ -615,6 +625,7 @@ struct ReorderBuffer
 	 * Memory contexts for specific types objects
 	 */
 	MemoryContext change_context;
+	MemoryContext sequence_context;
 	MemoryContext txn_context;
 	MemoryContext tup_context;
 
@@ -670,8 +681,8 @@ void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
 void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
-									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-									   RelFileNode rnode, bool transactional, bool created,
+									   XLogRecPtr lsn, RepOriginId origin_id,
+									   RelFileNode rnode, bool created,
 									   ReorderBufferTupleBuf *tuplebuf);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
@@ -720,7 +731,4 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
-bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
-												 RelFileNode rnode, bool created);
-
 #endif
-- 
2.34.1

From 294faa547cf8347881831d2137b22b6c25ffa445 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Sat, 2 Apr 2022 20:44:15 +0200
Subject: [PATCH vchanges 1/2] WIP: sequence reworks

---
 src/backend/replication/logical/decode.c      |  32 +--
 .../replication/logical/reorderbuffer.c       | 236 ++++++------------
 src/include/replication/reorderbuffer.h       |   8 +-
 3 files changed, 92 insertions(+), 184 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..bd78a122b03 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1313,9 +1313,7 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	TransactionId xid = XLogRecGetXid(r);
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
 	xl_seq_rec *xlrec;
-	Snapshot	snapshot;
 	RepOriginId origin_id = XLogRecGetOrigin(r);
-	bool		transactional;
 
 	/* only decode changes flagged with XLOG_SEQ_LOG */
 	if (info != XLOG_SEQ_LOG)
@@ -1352,32 +1350,14 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
-	DecodeSeqTuple(tupledata, datalen, tuplebuf);
 
-	/*
-	 * Should we handle the sequence increment as transactional or not?
-	 *
-	 * If the sequence was created in a still-running transaction, treat
-	 * it as transactional and queue the increments. Otherwise it needs
-	 * to be treated as non-transactional, in which case we send it to
-	 * the plugin right away.
-	 */
-	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
-														 target_node,
-														 xlrec->created);
-
-	/* Skip the change if already processed (per the snapshot). */
-	if (transactional &&
-		!SnapBuildProcessChange(builder, xid, buf->origptr))
-		return;
-	else if (!transactional &&
-			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
-			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+	if (!SnapBuildProcessChange(builder, xid, buf->origptr))
 		return;
 
-	/* Queue the increment (or send immediately if not transactional). */
-	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
-	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
-							   origin_id, target_node, transactional,
+	DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+	/* queue the sequence increment */
+	ReorderBufferQueueSequence(ctx->reorder, xid, buf->endptr,
+							   origin_id, target_node,
 							   xlrec->created, tuplebuf);
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fae..0c4fb9cb561 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -155,6 +155,7 @@ typedef struct ReorderBufferTXNByIdEnt
 typedef struct ReorderBufferSequenceEnt
 {
 	RelFileNode		rnode;
+	XLogRecPtr		lsn;
 	TransactionId	xid;
 } ReorderBufferSequenceEnt;
 
@@ -923,21 +924,26 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
  * so we simply do a lookup (the sequence is identified by relfilende). If
  * we find a match, the increment should be handled as transactional.
  */
-bool
+static bool
 ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
 									 RelFileNode rnode, bool created)
 {
 	bool	found = false;
+	ReorderBufferSequenceEnt *ent;
 
 	if (created)
 		return true;
 
-	hash_search(rb->sequences,
-				(void *) &rnode,
-				HASH_FIND,
-				&found);
+	ent = hash_search(rb->sequences,
+					  (void *) &rnode,
+					  HASH_FIND,
+					  &found);
+
+	/* if not found, it's definitely non-transactional */
+	if (!found)
+		return false;
 
-	return found;
+	return (ent->xid != InvalidTransactionId);
 }
 
 /*
@@ -958,12 +964,10 @@ ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
 	while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
 	{
 		/* skip sequences not from this transaction */
-		if (ent->xid != xid)
-			continue;
+		if (ent->xid == xid)
+			ent->xid = InvalidTransactionId;
 
-		(void) hash_search(rb->sequences,
-					   (void *) &(ent->rnode),
-					   HASH_REMOVE, NULL);
+		/* FIXME maybe we could remove the entries at some point */
 	}
 }
 
@@ -978,166 +982,90 @@ ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
  */
 void
 ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
-						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-						   RelFileNode rnode, bool transactional, bool created,
+						   XLogRecPtr lsn, RepOriginId origin_id,
+						   RelFileNode rnode, bool created,
 						   ReorderBufferTupleBuf *tuplebuf)
 {
-	/*
-	 * Change needs to be handled as transactional, because the sequence was
-	 * created in a transaction that is still running. In that case all the
-	 * changes need to be queued in that transaction, we must not send them
-	 * to the downstream until the transaction commits.
-	 *
-	 * There's a bit of a trouble with subtransactions - we can't queue it
-	 * into the subxact, because it might be rolled back and we'd lose the
-	 * increment. We need to queue it into the same (sub)xact that created
-	 * the sequence, which is why we track the XID in the hash table.
-	 */
-	if (transactional)
-	{
-		MemoryContext oldcontext;
-		ReorderBufferChange *change;
-
-		/* lookup sequence by relfilenode */
-		ReorderBufferSequenceEnt   *ent;
-		bool						found;
-
-		/* transactional changes require a transaction */
-		Assert(xid != InvalidTransactionId);
-
-		/* search the lookup table (we ignore the return value, found is enough) */
-		ent = hash_search(rb->sequences,
-						  (void *) &rnode,
-						  created ? HASH_ENTER : HASH_FIND,
-						  &found);
-
-		/*
-		 * If this is the "create" increment, we must not have found any
-		 * pre-existing entry in the hash table (i.e. there must not be
-		 * any conflicting sequence).
-		 */
-		Assert(!(created && found));
+	bool transactional;
+	MemoryContext oldcontext;
+	ReorderBufferChange *change;
 
-		/* But we must have either created or found an existing entry. */
-		Assert(created || found);
+	/* lookup sequence by relfilenode */
+	ReorderBufferSequenceEnt   *ent;
+	bool						found;
 
-		/*
-		 * When creating the sequence, remember the XID of the transaction
-		 * that created id.
-		 */
-		if (created)
-			ent->xid = xid;
-
-		/* XXX Maybe check that we're still in the same top-level xact? */
+	transactional = ReorderBufferSequenceIsTransactional(rb, rnode, created);
 
-		/* OK, allocate and queue the change */
-		oldcontext = MemoryContextSwitchTo(rb->context);
-
-		change = ReorderBufferGetChange(rb);
+	/*
+	 * Maintenance of the sequences hash, so that we can distinguish which
+	 * additional increments are transactional and non-transactional.
+	 */
 
-		change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
-		change->origin_id = origin_id;
+	/* transactional changes require a transaction */
+	Assert(!(transactional && (xid == InvalidTransactionId)));
 
-		memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+	/* search the lookup table */
+	ent = hash_search(rb->sequences,
+					  (void *) &rnode,
+					  HASH_ENTER,
+					  &found);
 
-		change->data.sequence.tuple = tuplebuf;
+	/*
+	 * If this is the "create" increment, we must not have found any entry in
+	 * the hash table (i.e. there must not be any conflicting sequence with the
+	 * same relfilenode).
+	 */
+	Assert(!(created && found));
 
-		/* add it to the same subxact that created the sequence */
-		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+	/*
+	 * It's either non-transactioanl change, creation of a relfilenode, or we
+	 * should have found an existing entry.
+	 */
+	Assert(!transactional || created || found);
 
-		MemoryContextSwitchTo(oldcontext);
-	}
+	/*
+	 * When creating the sequence, remember the XID of the transaction
+	 * that created id.
+	 */
+	if (transactional)
+		ent->xid = xid;
 	else
-	{
-		/*
-		 * This increment is for a sequence that was not created in any
-		 * running transaction, so we treat it as non-transactional and
-		 * just send it to the output plugin directly.
-		 */
-		ReorderBufferTXN *txn = NULL;
-		volatile Snapshot snapshot_now = snapshot;
-		bool	using_subtxn;
-
-#ifdef USE_ASSERT_CHECKING
-		/* All "creates" have to be handled as transactional. */
-		Assert(!created);
-
-		/* Make sure the sequence is not in the hash table. */
-		{
-			bool	found;
-			hash_search(rb->sequences,
-						(void *) &rnode,
-						HASH_FIND, &found);
-			Assert(!found);
-		}
-#endif
-
-		if (xid != InvalidTransactionId)
-			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
-
-		/* setup snapshot to allow catalog access */
-		SetupHistoricSnapshot(snapshot_now, NULL);
-
-		/*
-		 * Decoding needs access to syscaches et al., which in turn use
-		 * heavyweight locks and such. Thus we need to have enough state around to
-		 * keep track of those.  The easiest way is to simply use a transaction
-		 * internally.  That also allows us to easily enforce that nothing writes
-		 * to the database by checking for xid assignments.
-		 *
-		 * When we're called via the SQL SRF there's already a transaction
-		 * started, so start an explicit subtransaction there.
-		 */
-		using_subtxn = IsTransactionOrTransactionBlock();
-
-		PG_TRY();
-		{
-			Relation	relation;
-			HeapTuple	tuple;
-			Form_pg_sequence_data seq;
-			Oid			reloid;
-
-			if (using_subtxn)
-				BeginInternalSubTransaction("sequence");
-			else
-				StartTransactionCommand();
-
-			reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
-
-			if (reloid == InvalidOid)
-				elog(ERROR, "could not map filenode \"%s\" to relation OID",
-					 relpathperm(rnode,
-								 MAIN_FORKNUM));
+		ent->xid = InvalidTransactionId;
 
-			relation = RelationIdGetRelation(reloid);
-			tuple = &tuplebuf->tuple;
-			seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+	/*
+	 * Queue the sequence change, into a global list.
+	 *
+	 * There's a bit of a trouble with subtransactions - we can't queue it
+	 * into the subxact, because it might be rolled back and we'd lose the
+	 * increment. We need to queue it into the same (sub)xact that created
+	 * the sequence, which is why we track the XID in the hash table.
+	 */
+	oldcontext = MemoryContextSwitchTo(rb->context);
 
-			rb->sequence(rb, txn, lsn, relation, transactional,
-						 seq->last_value, seq->log_cnt, seq->is_called);
+	change = ReorderBufferGetChange(rb);
 
-			RelationClose(relation);
+	change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+	change->origin_id = origin_id;
 
-			TeardownHistoricSnapshot(false);
+	memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
 
-			AbortCurrentTransaction();
+	change->data.sequence.tuple = tuplebuf;
+	change->data.sequence.transactional = transactional;
 
-			if (using_subtxn)
-				RollbackAndReleaseCurrentSubTransaction();
-		}
-		PG_CATCH();
-		{
-			TeardownHistoricSnapshot(true);
+	Assert(InvalidXLogRecPtr != lsn);
+	change->lsn = lsn;
 
-			AbortCurrentTransaction();
+	ReorderBufferQueueChange(rb, xid, lsn, change, false);
 
-			if (using_subtxn)
-				RollbackAndReleaseCurrentSubTransaction();
+	/*
+	 * FIXME What to do about concurrent_abort for the transaction? Maybe
+	 * should depend on transactional vs. non-transactional change.
+	 *
+	 * FIXME Also what to do about memory accounting? If not treated as
+	 * regular changes, it's not part of memory accounting.
+	 */
 
-			PG_RE_THROW();
-		}
-		PG_END_TRY();
-	}
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -2250,10 +2178,12 @@ ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	/* Only ever called from ReorderBufferApplySequence, so transational. */
 	if (streaming)
-		rb->stream_sequence(rb, txn, change->lsn, relation, true,
+		rb->stream_sequence(rb, txn, change->lsn, relation,
+							change->data.sequence.transactional,
 							seq->last_value, seq->log_cnt, seq->is_called);
 	else
-		rb->sequence(rb, txn, change->lsn, relation, true,
+		rb->sequence(rb, txn, change->lsn, relation,
+					 change->data.sequence.transactional,
 					 seq->last_value, seq->log_cnt, seq->is_called);
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0bcc150b331..6e939e27a15 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -165,6 +165,7 @@ typedef struct ReorderBufferChange
 		{
 			RelFileNode relnode;
 			ReorderBufferTupleBuf *tuple;
+			bool		transactional;
 		}			sequence;
 	}			data;
 
@@ -670,8 +671,8 @@ void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
 void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
-									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-									   RelFileNode rnode, bool transactional, bool created,
+									   XLogRecPtr lsn, RepOriginId origin_id,
+									   RelFileNode rnode, bool created,
 									   ReorderBufferTupleBuf *tuplebuf);
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
@@ -720,7 +721,4 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
-bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
-												 RelFileNode rnode, bool created);
-
 #endif
-- 
2.34.1

Reply via email to