From 99ffc05998a157c2632711aaa8b5d7c4e22ebd70 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 25 Dec 2024 08:02:33 +0000
Subject: [PATCH] WIP: track wal segments

---
 .../replication/logical/reorderbuffer.c       | 84 +++++++++++--------
 src/include/replication/reorderbuffer.h       |  7 ++
 2 files changed, 58 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9c742e96eb..b2bb4423b8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -149,7 +149,7 @@ typedef struct ReorderBufferIterTXNEntry
 	ReorderBufferChange *change;
 	ReorderBufferTXN *txn;
 	TXNEntryFile file;
-	XLogSegNo	segno;
+	int	restore_from;
 } ReorderBufferIterTXNEntry;
 
 typedef struct ReorderBufferIterTXNState
@@ -215,6 +215,11 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 /* GUC variable */
 int			debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
 
+typedef struct WalSgmtsEntry
+{
+	XLogSegNo segno;
+} WalSgmtsEntry;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -254,7 +259,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-										TXNEntryFile *file, XLogSegNo *segno);
+										TXNEntryFile *file, int *restore_from);
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -432,6 +437,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 	/* InvalidCommandId is not zero, so set it explicitly */
 	txn->command_id = InvalidCommandId;
 	txn->output_plugin_private = NULL;
+	txn->walsgmts = NIL;
 
 	return txn;
 }
@@ -1305,7 +1311,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	for (off = 0; off < state->nr_txns; off++)
 	{
 		state->entries[off].file.vfd = -1;
-		state->entries[off].segno = 0;
+		state->entries[off].restore_from = 0;
 	}
 
 	/* allocate heap */
@@ -1333,7 +1339,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			/* serialize remaining changes */
 			ReorderBufferSerializeTXN(rb, txn);
 			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
-										&state->entries[off].segno);
+										&state->entries[off].restore_from);
 		}
 
 		cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1363,7 +1369,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				ReorderBufferSerializeTXN(rb, cur_txn);
 				ReorderBufferRestoreChanges(rb, cur_txn,
 											&state->entries[off].file,
-											&state->entries[off].segno);
+											&state->entries[off].restore_from);
 			}
 			cur_change = dlist_head_element(ReorderBufferChange, node,
 											&cur_txn->changes);
@@ -1448,7 +1454,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 */
 		rb->totalBytes += entry->txn->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
-										&state->entries[off].segno))
+										&state->entries[off].restore_from))
 		{
 			/* successfully restored changes from disk */
 			ReorderBufferChange *next_change =
@@ -3715,6 +3721,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
+	MemoryContext oldcontext;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3758,7 +3765,23 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 			/* open segment, create it if necessary */
 			fd = OpenTransientFile(path,
-								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+								   O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY);
+
+			if (fd < 0)
+				fd = OpenTransientFile(path,
+									   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
+			else
+			{
+				WalSgmtsEntry *entry;
+
+				oldcontext = MemoryContextSwitchTo(rb->context);
+
+				entry = palloc(sizeof(WalSgmtsEntry));
+				entry->segno = curOpenSegNo;
+
+				txn->walsgmts = lappend(txn->walsgmts, entry);
+				MemoryContextSwitchTo(oldcontext);
+			}
 
 			if (fd < 0)
 				ereport(ERROR,
@@ -4255,16 +4278,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
  */
 static Size
 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-							TXNEntryFile *file, XLogSegNo *segno)
+							TXNEntryFile *file,	int *restore_from)
 {
 	Size		restored = 0;
-	XLogSegNo	last_segno;
 	dlist_mutable_iter cleanup_iter;
 	File	   *fd = &file->vfd;
 
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
 	/* free current entries, so we have memory for more */
 	dlist_foreach_modify(cleanup_iter, &txn->changes)
 	{
@@ -4277,9 +4296,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	txn->nentries_mem = 0;
 	Assert(dlist_is_empty(&txn->changes));
 
-	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
-
-	while (restored < max_changes_in_memory && *segno <= last_segno)
+	while (restored < max_changes_in_memory &&
+		   (*restore_from) < txn->walsgmts->length)
 	{
 		int			readBytes;
 		ReorderBufferDiskChange *ondisk;
@@ -4289,19 +4307,23 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		if (*fd == -1)
 		{
 			char		path[MAXPGPATH];
+			ListCell *lc;
+			WalSgmtsEntry *entry;
+			XLogSegNo segno;
 
-			/* first time in */
-			if (*segno == 0)
-				XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
+			/* Next wal segment for the transaction */
+			lc = list_nth_cell(txn->walsgmts, *restore_from);
+			entry = (WalSgmtsEntry *) lfirst(lc);
+			segno = entry->segno;
 
-			Assert(*segno != 0 || dlist_is_empty(&txn->changes));
+			Assert(segno != 0 || dlist_is_empty(&txn->changes));
 
 			/*
 			 * No need to care about TLIs here, only used during a single run,
 			 * so each LSN only maps to a specific WAL record.
 			 */
 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
-										*segno);
+										segno);
 
 			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
 
@@ -4311,7 +4333,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			if (*fd < 0 && errno == ENOENT)
 			{
 				*fd = -1;
-				(*segno)++;
+				(*restore_from)++;
 				continue;
 			}
 			else if (*fd < 0)
@@ -4336,7 +4358,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		{
 			FileClose(*fd);
 			*fd = -1;
-			(*segno)++;
+			(*restore_from)++;
 			continue;
 		}
 		else if (readBytes < 0)
@@ -4567,26 +4589,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void
 ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
-	XLogSegNo	first;
-	XLogSegNo	cur;
-	XLogSegNo	last;
-
-	Assert(txn->first_lsn != InvalidXLogRecPtr);
-	Assert(txn->final_lsn != InvalidXLogRecPtr);
-
-	XLByteToSeg(txn->first_lsn, first, wal_segment_size);
-	XLByteToSeg(txn->final_lsn, last, wal_segment_size);
+	ListCell *cell;
 
 	/* iterate over all possible filenames, and delete them */
-	for (cur = first; cur <= last; cur++)
+	foreach(cell, txn->walsgmts)
 	{
+		WalSgmtsEntry *entry = (WalSgmtsEntry *)lfirst(cell);
+		XLogSegNo curr_segno = entry->segno;
 		char		path[MAXPGPATH];
 
-		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
+		ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, curr_segno);
 		if (unlink(path) != 0 && errno != ENOENT)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not remove file \"%s\": %m", path)));
+
+		txn->walsgmts = foreach_delete_current(txn->walsgmts, cell);
 	}
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7de50462dc..b2ef2640aa 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -422,6 +422,13 @@ typedef struct ReorderBufferTXN
 	 * Private data pointer of the output plugin.
 	 */
 	void	   *output_plugin_private;
+
+	/*
+	 * List of wal segments this txn is part of.
+	 *
+	 * XXX: check whether the attribute doesn't break ABI 
+	 */
+	List	*walsgmts;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
2.43.0

