From: Andres Freund <and...@anarazel.de>

This requires an up2date catalog and can thus only be run on a replica.

Missing:
- HEAP_NEWPAGE support
- HEAP2_MULTI_INSERT support
- DDL integration. *No* ddl, including TRUNCATE is possible atm
---
 src/backend/replication/logical/Makefile |    2 +-
 src/backend/replication/logical/decode.c |  439 ++++++++++++++++++++++++++++++
 src/include/replication/decode.h         |   23 ++
 3 files changed, 463 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/replication/logical/decode.c
 create mode 100644 src/include/replication/decode.h

diff --git a/src/backend/replication/logical/Makefile 
b/src/backend/replication/logical/Makefile
index 2eadab8..7dd9663 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = applycache.o
+OBJS = applycache.o decode.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
new file mode 100644
index 0000000..7e07d50
--- /dev/null
+++ b/src/backend/replication/logical/decode.c
@@ -0,0 +1,439 @@
+/*-------------------------------------------------------------------------
+ *
+ * decode.c
+ *
+ * Decodes wal records from an xlogreader.h callback into an applycache
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical/decode.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xact.h"
+
+#include "replication/applycache.h"
+#include "replication/decode.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/lsyscache.h"
+
+static void DecodeXLogTuple(char* data, Size len,
+                            HeapTuple table, ApplyCacheTupleBuf* tuple);
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf);
+static void DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, 
TransactionId xid,
+                            TransactionId *sub_xids, int nsubxacts);
+
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+       XLogRecord* r = &buf->record;
+       uint8 info = r->xl_info & ~XLR_INFO_MASK;
+
+       switch (r->xl_rmid)
+       {
+               case RM_HEAP_ID:
+               {
+                       info &= XLOG_HEAP_OPMASK;
+                       switch (info)
+                       {
+                               case XLOG_HEAP_INSERT:
+                                       DecodeInsert(cache, buf);
+                                       break;
+
+                               /* no guarantee that we get an HOT update 
again, so handle it as a normal update*/
+                               case XLOG_HEAP_HOT_UPDATE:
+                               case XLOG_HEAP_UPDATE:
+                                       DecodeUpdate(cache, buf);
+                                       break;
+
+                               case XLOG_HEAP_NEWPAGE:
+                                       DecodeNewpage(cache, buf);
+                                       break;
+
+                               case XLOG_HEAP_DELETE:
+                                       DecodeDelete(cache, buf);
+                                       break;
+                               default:
+                                       break;
+                       }
+                       break;
+               }
+               case RM_HEAP2_ID:
+               {
+                       info &= XLOG_HEAP_OPMASK;
+                       switch (info)
+                       {
+                               case XLOG_HEAP2_MULTI_INSERT:
+                                       /* this also handles the 
XLOG_HEAP_INIT_PAGE case */
+                                       DecodeMultiInsert(cache, buf);
+                                       break;
+                               default:
+                                       /* everything else here is just 
physical stuff were not interested in */
+                                       break;
+                       }
+                       break;
+               }
+
+               case RM_XACT_ID:
+               {
+                       switch (info)
+                       {
+                               case XLOG_XACT_COMMIT:
+                               {
+                                       TransactionId *sub_xids;
+                                       xl_xact_commit *xlrec = 
(xl_xact_commit*)buf->record_data;
+
+                                       /* FIXME: this is not really allowed if 
there is no subtransactions */
+                                       sub_xids = (TransactionId *) 
&(xlrec->xnodes[xlrec->nrels]);
+                                       DecodeCommit(cache, buf, r->xl_xid, 
sub_xids, xlrec->nsubxacts);
+
+                                       break;
+                               }
+                               case XLOG_XACT_COMMIT_PREPARED:
+                               {
+                                       TransactionId *sub_xids;
+                                       xl_xact_commit_prepared *xlrec = 
(xl_xact_commit_prepared*)buf->record_data;
+
+                                       sub_xids = (TransactionId *) 
&(xlrec->crec.xnodes[xlrec->crec.nrels]);
+
+                                       DecodeCommit(cache, buf, r->xl_xid, 
sub_xids,
+                                                    xlrec->crec.nsubxacts);
+
+                                       break;
+                               }
+                               case XLOG_XACT_COMMIT_COMPACT:
+                               {
+                                       xl_xact_commit_compact *xlrec = 
(xl_xact_commit_compact*)buf->record_data;
+                                       DecodeCommit(cache, buf, r->xl_xid, 
xlrec->subxacts,
+                                                    xlrec->nsubxacts);
+                                       break;
+                               }
+                               case XLOG_XACT_ABORT:
+                               case XLOG_XACT_ABORT_PREPARED:
+                               {
+                                       TransactionId *sub_xids;
+                                       xl_xact_abort *xlrec = 
(xl_xact_abort*)buf->record_data;
+                                       int i;
+
+                                       /* FIXME: this is not really allowed if 
there is no subtransactions */
+                                       sub_xids = (TransactionId *) 
&(xlrec->xnodes[xlrec->nrels]);
+
+                                       for(i = 0; i < xlrec->nsubxacts; i++)
+                                       {
+                                               ApplyCacheAbort(cache, 
*sub_xids, buf->origptr);
+                                               sub_xids += 1;
+                                       }
+
+                                       /* TODO: check that this also contains 
not-yet-aborted subtxns */
+                                       ApplyCacheAbort(cache, r->xl_xid, 
buf->origptr);
+
+                                       elog(WARNING, "ABORT %u", r->xl_xid);
+                                       break;
+                               }
+                               case XLOG_XACT_ASSIGNMENT:
+                                       /*
+                                        * XXX: We could reassign transactions 
to the parent here
+                                        * to save space and effort when 
merging transactions at
+                                        * commit.
+                                        */
+                                       break;
+                               case XLOG_XACT_PREPARE:
+                                       /*
+                                        * FXIME: we should replay the 
transaction and prepare it
+                                        * as well.
+                                        */
+                                       break;
+                               default:
+                                       break;
+                                       ;
+                       }
+                       break;
+               }
+               default:
+                       break;
+       }
+}
+
+static void
+DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+             TransactionId *sub_xids, int nsubxacts)
+{
+       int i;
+
+       for (i = 0; i < nsubxacts; i++)
+       {
+               ApplyCacheCommitChild(cache, xid, *sub_xids, buf->origptr);
+               sub_xids++;
+       }
+
+       /* replay actions of all transaction + subtransactions in order */
+       ApplyCacheCommit(cache, xid, buf->origptr);
+}
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+       XLogRecord* r = &buf->record;
+       xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data;
+
+       Oid relfilenode = xlrec->target.node.relNode;
+
+       ApplyCacheChange* change;
+
+       if (r->xl_info & XLR_BKP_BLOCK_1
+           && r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))
+       {
+               elog(FATAL, "huh, no tuple data on wal_level = logical?");
+       }
+
+       if(relfilenode == 0)
+       {
+               elog(ERROR, "nailed catalog changed");
+       }
+
+       change = ApplyCacheGetChange(cache);
+       change->action = APPLY_CACHE_CHANGE_INSERT;
+
+       /*
+        * Lookup the pg_class entry for the relfilenode to get the real oid
+        */
+       {
+               MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+               change->table = SearchSysCacheCopy1(RELFILENODE,
+                                                   relfilenode);
+               MemoryContextSwitchTo(curctx);
+       }
+
+       if (!HeapTupleIsValid(change->table))
+       {
+#ifdef SHOULD_BE_HANDLED_BETTER
+               elog(WARNING, "cache lookup failed for relfilenode %u, 
systable?",
+                        relfilenode);
+#endif
+               ApplyCacheReturnChange(cache, change);
+               return;
+       }
+
+       if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+       {
+#ifdef VERBOSE_DEBUG
+               elog(LOG, "skipping change to systable");
+#endif
+               ApplyCacheReturnChange(cache, change);
+               return;
+       }
+
+#ifdef VERBOSE_DEBUG
+       {
+               /*for accessing the cache */
+               Form_pg_class class_form;
+               class_form = (Form_pg_class) GETSTRUCT(change->table);
+               elog(WARNING, "INSERT INTO \"%s\"", 
NameStr(class_form->relname));
+       }
+#endif
+
+       change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+       DecodeXLogTuple((char*)xlrec + SizeOfHeapInsert,
+                       r->xl_len - SizeOfHeapInsert,
+                       change->table, change->newtuple);
+
+       ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+       XLogRecord* r = &buf->record;
+       xl_heap_update *xlrec = (xl_heap_update *) buf->record_data;
+
+       Oid relfilenode = xlrec->target.node.relNode;
+
+       ApplyCacheChange* change;
+
+       if ((r->xl_info & XLR_BKP_BLOCK_1 || r->xl_info & XLR_BKP_BLOCK_2) &&
+           (r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)))
+       {
+               elog(FATAL, "huh, no tuple data on wal_level = logical?");
+       }
+
+       change = ApplyCacheGetChange(cache);
+       change->action = APPLY_CACHE_CHANGE_UPDATE;
+
+       /*
+        * Lookup the pg_class entry for the relfilenode to get the real oid
+        */
+       {
+               MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+               change->table = SearchSysCacheCopy1(RELFILENODE,
+                                                   relfilenode);
+               MemoryContextSwitchTo(curctx);
+       }
+
+       if (!HeapTupleIsValid(change->table))
+       {
+#ifdef SHOULD_BE_HANDLED_BETTER
+               elog(WARNING, "cache lookup failed for relfilenode %u, 
systable?",
+                        relfilenode);
+#endif
+               ApplyCacheReturnChange(cache, change);
+               return;
+       }
+
+       if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+       {
+#ifdef VERBOSE_DEBUG
+               elog(LOG, "skipping change to systable");
+#endif
+               ApplyCacheReturnChange(cache, change);
+               return;
+       }
+
+#ifdef VERBOSE_DEBUG
+       {
+               /*for accessing the cache */
+               Form_pg_class class_form;
+               class_form = (Form_pg_class) GETSTRUCT(change->table);
+               elog(WARNING, "UPDATE \"%s\"", NameStr(class_form->relname));
+       }
+#endif
+
+       /* FIXME: need to save the old tuple as well if we want primary key 
updates to work. */
+       change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+       DecodeXLogTuple((char*)xlrec + SizeOfHeapUpdate,
+                       r->xl_len - SizeOfHeapUpdate,
+                       change->table, change->newtuple);
+
+       ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+       XLogRecord* r = &buf->record;
+
+       xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data;
+
+       Oid relfilenode = xlrec->target.node.relNode;
+
+       ApplyCacheChange* change;
+
+       change = ApplyCacheGetChange(cache);
+       change->action = APPLY_CACHE_CHANGE_DELETE;
+
+       if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+       {
+               elog(FATAL, "huh, no primary key for a delete on wal_level = 
logical?");
+       }
+
+       /*
+        * Lookup the pg_class entry for the relfilenode to get the real oid
+        */
+       {
+               MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+               change->table = SearchSysCacheCopy1(RELFILENODE,
+                                                   relfilenode);
+               MemoryContextSwitchTo(curctx);
+       }
+
+       if (!HeapTupleIsValid(change->table))
+       {
+#ifdef SHOULD_BE_HANDLED_BETTER
+               elog(WARNING, "cache lookup failed for relfilenode %u, 
systable?",
+                        relfilenode);
+#endif
+               ApplyCacheReturnChange(cache, change);
+               return;
+       }
+
+       if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+       {
+#ifdef VERBOSE_DEBUG
+               elog(LOG, "skipping change to systable");
+#endif
+               ApplyCacheReturnChange(cache, change);
+               return;
+       }
+
+#ifdef VERBOSE_DEBUG
+       {
+               /*for accessing the cache */
+               Form_pg_class class_form;
+               class_form = (Form_pg_class) GETSTRUCT(change->table);
+               elog(WARNING, "DELETE FROM \"%s\"", 
NameStr(class_form->relname));
+       }
+#endif
+
+       change->oldtuple = ApplyCacheGetTupleBuf(cache);
+
+       DecodeXLogTuple((char*)xlrec + SizeOfHeapDelete,
+                       r->xl_len - SizeOfHeapDelete,
+                       change->table, change->oldtuple);
+
+       ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+
+static void
+DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+       elog(WARNING, "skipping XLOG_HEAP_NEWPAGE record because we are too 
dumb");
+}
+
+static void
+DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+       elog(WARNING, "skipping XLOG_HEAP2_MULTI_INSERT record because we are 
too dumb");
+}
+
+
+static void DecodeXLogTuple(char* data, Size len,
+                            HeapTuple table, ApplyCacheTupleBuf* tuple)
+{
+       xl_heap_header xlhdr;
+       int datalen = len - SizeOfHeapHeader;
+
+       Assert(datalen >= 0);
+       Assert(datalen <= MaxHeapTupleSize);
+
+       tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+       /* not a disk based tuple */
+       ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+       /* probably not needed, but ... (is it actually valid to set it?) */
+       tuple->tuple.t_tableOid = HeapTupleGetOid(table);
+       tuple->tuple.t_data = &tuple->header;
+
+       /* data is not stored aligned */
+       memcpy((char *) &xlhdr,
+              data,
+              SizeOfHeapHeader);
+
+       memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+       memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+              data + SizeOfHeapHeader,
+              datalen);
+
+       tuple->header.t_infomask = xlhdr.t_infomask;
+       tuple->header.t_infomask2 = xlhdr.t_infomask2;
+       tuple->header.t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 0000000..53088e2
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ *     PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/applycache.h"
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf);
+
+typedef struct ReaderApplyState
+{
+       ApplyCache *apply_cache;
+} ReaderApplyState;
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to