This adds a new wal_level value 'logical'

Missing cases:
- heap_multi_insert
- primary key changes for updates
- no primary key
- LOG_NEWPAGE
---
 src/backend/access/heap/heapam.c        | 135 +++++++++++++++++++++++++++++---
 src/backend/access/transam/xlog.c       |   1 +
 src/backend/catalog/index.c             |  74 +++++++++++++++++
 src/bin/pg_controldata/pg_controldata.c |   2 +
 src/include/access/xlog.h               |   3 +-
 src/include/catalog/index.h             |   4 +
 6 files changed, 207 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f56b577..190ae03 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -53,6 +53,7 @@
 #include "access/xact.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1938,10 +1939,19 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		xl_heap_insert xlrec;
 		xl_heap_header xlhdr;
 		XLogRecPtr	recptr;
-		XLogRecData rdata[3];
+		XLogRecData rdata[4];
 		Page		page = BufferGetPage(buffer);
 		uint8		info = XLOG_HEAP_INSERT;
 
+		/*
+		 * For the logical replication case we need the tuple even if were
+		 * doing a full page write. We could alternatively store a pointer into
+		 * the fpw though.
+		 * For that to work we add another rdata entry for the buffer in that
+		 * case.
+		 */
+		bool        need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
 		xlrec.all_visible_cleared = all_visible_cleared;
 		xlrec.target.node = relation->rd_node;
 		xlrec.target.tid = heaptup->t_self;
@@ -1961,18 +1971,32 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		 */
 		rdata[1].data = (char *) &xlhdr;
 		rdata[1].len = SizeOfHeapHeader;
-		rdata[1].buffer = buffer;
+		rdata[1].buffer = need_tuple ? InvalidBuffer : buffer;
 		rdata[1].buffer_std = true;
 		rdata[1].next = &(rdata[2]);
 
 		/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
 		rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
 		rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-		rdata[2].buffer = buffer;
+		rdata[2].buffer = need_tuple ? InvalidBuffer : buffer;
 		rdata[2].buffer_std = true;
 		rdata[2].next = NULL;
 
 		/*
+		 * add record for the buffer without actual content thats removed if
+		 * fpw is done for that buffer
+		 */
+		if(need_tuple){
+			rdata[2].next = &(rdata[3]);
+
+			rdata[3].data = NULL;
+			rdata[3].len = 0;
+			rdata[3].buffer = buffer;
+			rdata[3].buffer_std = true;
+			rdata[3].next = NULL;
+		}
+
+		/*
 		 * If this is the single and first tuple on page, we can reinit the
 		 * page instead of restoring the whole thing.  Set flag, and hide
 		 * buffer references from XLogInsert.
@@ -1981,7 +2005,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
 		{
 			info |= XLOG_HEAP_INIT_PAGE;
-			rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+			rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
 		}
 
 		recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -2569,7 +2593,9 @@ l1:
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
-		XLogRecData rdata[2];
+		XLogRecData rdata[4];
+
+		bool need_tuple = wal_level == WAL_LEVEL_LOGICAL && relation->rd_id  >= FirstNormalObjectId;
 
 		xlrec.all_visible_cleared = all_visible_cleared;
 		xlrec.target.node = relation->rd_node;
@@ -2585,6 +2611,73 @@ l1:
 		rdata[1].buffer_std = true;
 		rdata[1].next = NULL;
 
+		/*
+		 * XXX: We could decide not to log changes when the origin is not the
+		 * local node, that should reduce redundant logging.
+		 */
+		if(need_tuple){
+			xl_heap_header xlhdr;
+
+			Oid indexoid = InvalidOid;
+			int16 pknratts;
+			int16 pkattnum[INDEX_MAX_KEYS];
+			Oid pktypoid[INDEX_MAX_KEYS];
+			Oid pkopclass[INDEX_MAX_KEYS];
+			TupleDesc desc = RelationGetDescr(relation);
+			Relation index_rel;
+			TupleDesc indexdesc;
+			int natt;
+
+			Datum idxvals[INDEX_MAX_KEYS];
+			bool idxisnull[INDEX_MAX_KEYS];
+			HeapTuple idxtuple;
+
+			MemSet(pkattnum, 0, sizeof(pkattnum));
+			MemSet(pktypoid, 0, sizeof(pktypoid));
+			MemSet(pkopclass, 0, sizeof(pkopclass));
+			MemSet(idxvals, 0, sizeof(idxvals));
+			MemSet(idxisnull, 0, sizeof(idxisnull));
+			relationFindPrimaryKey(relation, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+			if(!indexoid){
+				elog(WARNING, "Could not find primary key for table with oid %u",
+				     relation->rd_id);
+				goto no_index_found;
+			}
+
+			index_rel = index_open(indexoid, AccessShareLock);
+
+			indexdesc = RelationGetDescr(index_rel);
+
+			for(natt = 0; natt < indexdesc->natts; natt++){
+				idxvals[natt] =
+					fastgetattr(&tp, pkattnum[natt], desc, &idxisnull[natt]);
+				Assert(!idxisnull[natt]);
+			}
+
+			idxtuple = heap_form_tuple(indexdesc, idxvals, idxisnull);
+
+			xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2;
+			xlhdr.t_infomask = idxtuple->t_data->t_infomask;
+			xlhdr.t_hoff = idxtuple->t_data->t_hoff;
+
+			rdata[1].next = &(rdata[2]);
+			rdata[2].data = (char*)&xlhdr;
+			rdata[2].len = SizeOfHeapHeader;
+			rdata[2].buffer = InvalidBuffer;
+			rdata[2].next = NULL;
+
+			rdata[2].next = &(rdata[3]);
+			rdata[3].data = (char *) idxtuple->t_data + offsetof(HeapTupleHeaderData, t_bits);
+			rdata[3].len = idxtuple->t_len - offsetof(HeapTupleHeaderData, t_bits);
+			rdata[3].buffer = InvalidBuffer;
+			rdata[3].next = NULL;
+
+			heap_close(index_rel, NoLock);
+		no_index_found:
+			;
+		}
+
 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
 
 		PageSetLSN(page, recptr);
@@ -4414,9 +4507,14 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
 	xl_heap_header xlhdr;
 	uint8		info;
 	XLogRecPtr	recptr;
-	XLogRecData rdata[4];
+	XLogRecData rdata[5];
 	Page		page = BufferGetPage(newbuf);
 
+	/*
+	 * Just as for XLOG_HEAP_INSERT we need to make sure the tuple
+	 */
+	bool        need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
@@ -4447,28 +4545,43 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
 	xlhdr.t_hoff = newtup->t_data->t_hoff;
 
 	/*
-	 * As with insert records, we need not store the rdata[2] segment if we
-	 * decide to store the whole buffer instead.
+	 * As with insert's logging , we need not store the the Datum containing
+	 * tuples separately from the buffer if we do logical replication that
+	 * is...
 	 */
 	rdata[2].data = (char *) &xlhdr;
 	rdata[2].len = SizeOfHeapHeader;
-	rdata[2].buffer = newbuf;
+	rdata[2].buffer = need_tuple ? InvalidBuffer : newbuf;
 	rdata[2].buffer_std = true;
 	rdata[2].next = &(rdata[3]);
 
 	/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
 	rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
 	rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-	rdata[3].buffer = newbuf;
+	rdata[3].buffer = need_tuple ? InvalidBuffer : newbuf;
 	rdata[3].buffer_std = true;
 	rdata[3].next = NULL;
 
+	/*
+	 * separate storage for the buffer reference of the new page in the
+	 * wal_level=logical case
+	*/
+	if(need_tuple){
+		rdata[3].next = &(rdata[4]);
+
+		rdata[4].data = NULL,
+		rdata[4].len = 0;
+		rdata[4].buffer = newbuf;
+		rdata[4].buffer_std = true;
+		rdata[4].next = NULL;
+	}
+
 	/* If new tuple is the single and first tuple on page... */
 	if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
 		PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
 	{
 		info |= XLOG_HEAP_INIT_PAGE;
-		rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+		rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = InvalidBuffer;
 	}
 
 	recptr = XLogInsert(RM_HEAP_ID, info, rdata);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ff56c26..53a0bc8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -107,6 +107,7 @@ const struct config_enum_entry wal_level_options[] = {
 	{"minimal", WAL_LEVEL_MINIMAL, false},
 	{"archive", WAL_LEVEL_ARCHIVE, false},
 	{"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+	{"logical", WAL_LEVEL_LOGICAL, false},
 	{NULL, 0, false}
 };
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 464950b..8145997 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -49,6 +49,7 @@
 #include "nodes/nodeFuncs.h"
 #include "optimizer/clauses.h"
 #include "parser/parser.h"
+#include "parser/parse_relation.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -3322,3 +3323,76 @@ ResetReindexPending(void)
 {
 	pendingReindexedIndexes = NIL;
 }
+
+/*
+ * relationFindPrimaryKey
+ *		Find primary key for a relation if it exists.
+ *
+ * If no primary key is found *indexOid is set to InvalidOid
+ *
+ * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey.
+ *
+ * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to
+ * make this more efficient.
+ */
+void
+relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                       int16 *nratts, int16 *attnums, Oid *atttypids,
+                       Oid *opclasses){
+	List *indexoidlist;
+	ListCell *indexoidscan;
+	HeapTuple indexTuple = NULL;
+	Datum indclassDatum;
+	bool isnull;
+	oidvector  *indclass;
+	int i;
+	Form_pg_index indexStruct = NULL;
+
+	*indexOid = InvalidOid;
+
+	indexoidlist = RelationGetIndexList(pkrel);
+
+	foreach(indexoidscan, indexoidlist)
+	{
+		Oid indexoid = lfirst_oid(indexoidscan);
+
+		indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+		if(!HeapTupleIsValid(indexTuple))
+			elog(ERROR, "cache lookup failed for index %u", indexoid);
+
+		indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+		if(indexStruct->indisprimary && indexStruct->indimmediate)
+		{
+			*indexOid = indexoid;
+			break;
+		}
+		ReleaseSysCache(indexTuple);
+
+	}
+	list_free(indexoidlist);
+
+	if (!OidIsValid(*indexOid))
+		return;
+
+	/* Must get indclass the hard way */
+	indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+									Anum_pg_index_indclass, &isnull);
+	Assert(!isnull);
+	indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+	*nratts = indexStruct->indnatts;
+	/*
+	 * Now build the list of PK attributes from the indkey definition (we
+	 * assume a primary key cannot have expressional elements)
+	 */
+	for (i = 0; i < indexStruct->indnatts; i++)
+	{
+		int			pkattno = indexStruct->indkey.values[i];
+
+		attnums[i] = pkattno;
+		atttypids[i] = attnumTypeId(pkrel, pkattno);
+		opclasses[i] = indclass->values[i];
+	}
+
+	ReleaseSysCache(indexTuple);
+}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 129c4d0..10080d0 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -77,6 +77,8 @@ wal_level_str(WalLevel wal_level)
 			return "archive";
 		case WAL_LEVEL_HOT_STANDBY:
 			return "hot_standby";
+		case WAL_LEVEL_LOGICAL:
+			return "logical";
 	}
 	return _("unrecognized wal_level");
 }
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2893f3b..7d90416 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -200,7 +200,8 @@ typedef enum WalLevel
 {
 	WAL_LEVEL_MINIMAL = 0,
 	WAL_LEVEL_ARCHIVE,
-	WAL_LEVEL_HOT_STANDBY
+	WAL_LEVEL_HOT_STANDBY,
+	WAL_LEVEL_LOGICAL
 } WalLevel;
 extern int	wal_level;
 
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index eb417ce..3de0a29 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -102,4 +102,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
 extern bool ReindexIsProcessingIndex(Oid indexOid);
 extern Oid	IndexGetRelation(Oid indexId, bool missing_ok);
 
+extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                                   int16 *nratts, int16 *attnums, Oid *atttypids,
+                                   Oid *opclasses);
+
 #endif   /* INDEX_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to