Please find attached a patch that makes tuple ids and info about
weather it was plain or HOT update available to logical decoding
callbacks.

Also modified test_decoding to show both tids -
- old tid has format -(pageno, slot)
- new tid has format +(pageno, slot)
if it is a HOT update, it is decoded prefixed with 'HOT '

Sample usage:

hannu=# SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');
pg_create_logical_replication_slot
------------------------------------
(test_slot,0/1BF1B38)
(1 row)
hannu=# CREATE TABLE nokey(data text);
CREATE TABLE
hannu=# insert into nokey (data) values('a');
INSERT 0 1
hannu=# update nokey set data = 'b';
UPDATE 1
hannu=# delete from nokey ;
DELETE 1
hannu=# SELECT lsn, xid, data FROM
pg_logical_slot_get_changes('test_slot', NULL, NULL);
lsn | xid | data
-----------+-----+------------------------------------------------------------
0/1C20538 | 767 | BEGIN 767
0/1C2B1E8 | 767 | COMMIT 767
0/1C2B220 | 768 | BEGIN 768
0/1C2B220 | 768 | table public.nokey: INSERT:+(0,1) data[text]:'a'
0/1C2B290 | 768 | COMMIT 768
0/1C2B300 | 769 | BEGIN 769
0/1C2B300 | 769 | table public.nokey: HOT UPDATE:-(0,1)+(0,2) data[text]:'b'
0/1C2B378 | 769 | COMMIT 769
0/1C2B3B0 | 770 | BEGIN 770
0/1C2B3B0 | 770 | table public.nokey: DELETE:-(0,2) (no-tuple-data)
0/1C2B418 | 770 | COMMIT 770
(11 rows)

My planned use case is for reliable logical replication of tables
without primary key or other declared IDENTITY (as long as there are
no updates on target, or at leas no non-hot updates)

Sending thgis part as an independent patch as there may be other
interesting use cases as well.

--
Hannu
From e2a28c6d4cd0d735fd0ff1c287b0e289c2d29022 Mon Sep 17 00:00:00 2001
From: Hannu Krosing <[email protected]>
Date: Thu, 4 Dec 2025 21:21:04 +0100
Subject: [PATCH] Made tuple ids and info about HOT updates available to
 logical decoding

Modified test_decoding to show both
 old tid has format -(pageno, slot)
 new tid has format +(pageno, slot)
 if it is a HOT update, ith is decoded prefixed with 'HOT '

Sample usage:

hannu=# SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');
 pg_create_logical_replication_slot
------------------------------------
 (test_slot,0/1BF1B38)
(1 row)
hannu=# CREATE TABLE nokey(data text);
CREATE TABLE
hannu=# insert into nokey (data) values('a');
INSERT 0 1
hannu=# update nokey set data = 'b';
UPDATE 1
hannu=# delete from nokey ;
DELETE 1
hannu=# SELECT lsn, xid, data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL);
    lsn    | xid |                            data
-----------+-----+------------------------------------------------------------
 0/1C20538 | 767 | BEGIN 767
 0/1C2B1E8 | 767 | COMMIT 767
 0/1C2B220 | 768 | BEGIN 768
 0/1C2B220 | 768 | table public.nokey: INSERT:+(0,1) data[text]:'a'
 0/1C2B290 | 768 | COMMIT 768
 0/1C2B300 | 769 | BEGIN 769
 0/1C2B300 | 769 | table public.nokey: HOT UPDATE:-(0,1)+(0,2) data[text]:'b'
 0/1C2B378 | 769 | COMMIT 769
 0/1C2B3B0 | 770 | BEGIN 770
 0/1C2B3B0 | 770 | table public.nokey: DELETE:-(0,2) (no-tuple-data)
 0/1C2B418 | 770 | COMMIT 770
(11 rows)
---
 contrib/test_decoding/test_decoding.c    | 33 +++++++++++++++++++-
 src/backend/replication/logical/decode.c | 38 +++++++++++++++++++-----
 src/include/replication/reorderbuffer.h  |  7 +++++
 3 files changed, 70 insertions(+), 8 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index bb495563200..7a1805d5a97 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -596,6 +596,23 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 	}
 }
 
+
+static inline char* _format_tid(char *tidbuf, char prefix, ItemPointer itemPtr)
+{
+	BlockNumber blockNumber;
+	OffsetNumber offsetNumber;
+
+	blockNumber = ItemPointerGetBlockNumberNoCheck(itemPtr);
+	offsetNumber = ItemPointerGetOffsetNumberNoCheck(itemPtr);
+
+	tidbuf[0] = prefix;
+	/* Perhaps someday we should output this as a record. */
+	snprintf(tidbuf+1, 32-1, "(%u,%u)", blockNumber, offsetNumber);
+
+	return tidbuf;
+}
+
+
 /*
  * callback for individual changed tuples
  */
@@ -608,6 +625,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
+	char		tidbuf[32];
 
 	data = ctx->output_plugin_private;
 	txndata = txn->output_plugin_private;
@@ -639,6 +657,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			appendStringInfoString(ctx->out, " INSERT:");
+			if (change->data.tp.newctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '+', &(change->data.tp.newctid)));
 			if (change->data.tp.newtuple == NULL)
 				appendStringInfoString(ctx->out, " (no-tuple-data)");
 			else
@@ -647,7 +668,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									false);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (change->data.tp.is_hot_update)
+				appendStringInfoString(ctx->out, " HOT");
 			appendStringInfoString(ctx->out, " UPDATE:");
+			if (change->data.tp.oldctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '-', &(change->data.tp.oldctid)));
+			if (change->data.tp.newctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '+', &(change->data.tp.newctid)));
 			if (change->data.tp.oldtuple != NULL)
 			{
 				appendStringInfoString(ctx->out, " old-key:");
@@ -666,7 +695,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			appendStringInfoString(ctx->out, " DELETE:");
-
+			if (change->data.tp.oldctid.ip_posid)
+				appendStringInfoString(ctx->out,
+					_format_tid(tidbuf, '-', &(change->data.tp.oldctid)));
 			/* if there was no PK, we only know that a delete happened */
 			if (change->data.tp.oldtuple == NULL)
 				appendStringInfoString(ctx->out, " (no-tuple-data)");
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cc03f0706e9..a04ae2a717a 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,7 +42,7 @@
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot);
 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -502,7 +502,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_HEAP_UPDATE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
 				!ctx->fast_forward)
-				DecodeUpdate(ctx, buf);
+				DecodeUpdate(ctx, buf, info == XLOG_HEAP_HOT_UPDATE);
 			break;
 
 		case XLOG_HEAP_DELETE:
@@ -909,6 +909,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_insert *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber blkno;
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
@@ -920,7 +921,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -945,6 +946,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
 
+	ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offnum);
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -959,18 +961,20 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Updates can possibly contain a new tuple and the old primary key.
  */
 static void
-DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_update *xlrec;
 	ReorderBufferChange *change;
 	char	   *data;
 	RelFileLocator target_locator;
+	BlockNumber blkno;
+	BlockNumber old_blkno = InvalidBlockNumber;
 
 	xlrec = (xl_heap_update *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -983,6 +987,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	change->origin_id = XLogRecGetOrigin(r);
 	memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
 
+	ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->new_offnum);
+
+	/* If block 1 is present, it contains old tuple */
+	if (XLogRecHasBlockRef(r, 1))
+	{
+		RelFileLocator old_locator;
+		XLogRecGetBlockTag(r, 1, &old_locator, NULL, &old_blkno);
+	}
+
+	if (BlockNumberIsValid(old_blkno))
+		ItemPointerSet(&change->data.tp.oldctid, old_blkno, xlrec->old_offnum);
+	else
+		ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->old_offnum);
+
 	if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
 	{
 		Size		datalen;
@@ -1015,6 +1033,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 
 	change->data.tp.clear_toast_afterwards = true;
+	change->data.tp.is_hot_update = is_hot;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
@@ -1032,11 +1051,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_delete *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber blkno;
 
 	xlrec = (xl_heap_delete *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1070,6 +1090,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 						datalen, change->data.tp.oldtuple);
 	}
 
+	ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->offnum);
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1127,6 +1148,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	char	   *tupledata;
 	Size		tuplelen;
 	RelFileLocator rlocator;
+	BlockNumber blkno;
 
 	xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
 
@@ -1138,7 +1160,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blkno);
 	if (rlocator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1168,6 +1190,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
 
+		ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offsets[i]);
+
 		xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
 		data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
 		datalen = xlhdr->datalen;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..8c305ec6e41 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -104,6 +104,13 @@ typedef struct ReorderBufferChange
 			HeapTuple	oldtuple;
 			/* valid for INSERT || UPDATE */
 			HeapTuple	newtuple;
+
+			/* ctid for old tuple; valid for DELETE || UPDATE */
+			ItemPointerData oldctid;
+			/* ctid for new tuple; valid for INSERT || UPDATE */
+			ItemPointerData newctid;
+			/* update type - only valid for UPDATE */
+			bool		is_hot_update;
 		}			tp;
 
 		/*
-- 
2.43.0

Reply via email to