From: Andres Freund <and...@anarazel.de> This requires an up2date catalog and can thus only be run on a replica.
Missing: - HEAP_NEWPAGE support - HEAP2_MULTI_INSERT support - DDL integration. *No* ddl, including TRUNCATE is possible atm --- src/backend/replication/logical/Makefile | 2 +- src/backend/replication/logical/decode.c | 439 ++++++++++++++++++++++++++++++ src/include/replication/decode.h | 23 ++ 3 files changed, 463 insertions(+), 1 deletion(-) create mode 100644 src/backend/replication/logical/decode.c create mode 100644 src/include/replication/decode.h diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2eadab8..7dd9663 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) -OBJS = applycache.o +OBJS = applycache.o decode.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c new file mode 100644 index 0000000..7e07d50 --- /dev/null +++ b/src/backend/replication/logical/decode.c @@ -0,0 +1,439 @@ +/*------------------------------------------------------------------------- + * + * decode.c + * + * Decodes wal records from an xlogreader.h callback into an applycache + * + * + * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/replication/logical/decode.c + * + */ +#include "postgres.h" + +#include "access/heapam.h" +#include "access/transam.h" +#include "access/xlog_internal.h" +#include "access/xact.h" + +#include "replication/applycache.h" +#include "replication/decode.h" + +#include "utils/memutils.h" +#include "utils/syscache.h" +#include "utils/lsyscache.h" + +static void DecodeXLogTuple(char* data, Size len, + HeapTuple table, ApplyCacheTupleBuf* tuple); + +static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf); + +static void DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf); + +static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf); + +static void DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf); +static void DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf); + +static void DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid, + TransactionId *sub_xids, int nsubxacts); + + +void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf) +{ + XLogRecord* r = &buf->record; + uint8 info = r->xl_info & ~XLR_INFO_MASK; + + switch (r->xl_rmid) + { + case RM_HEAP_ID: + { + info &= XLOG_HEAP_OPMASK; + switch (info) + { + case XLOG_HEAP_INSERT: + DecodeInsert(cache, buf); + break; + + /* no guarantee that we get an HOT update again, so handle it as a normal update*/ + case XLOG_HEAP_HOT_UPDATE: + case XLOG_HEAP_UPDATE: + DecodeUpdate(cache, buf); + break; + + case XLOG_HEAP_NEWPAGE: + DecodeNewpage(cache, buf); + break; + + case XLOG_HEAP_DELETE: + DecodeDelete(cache, buf); + break; + default: + break; + } + break; + } + case RM_HEAP2_ID: + { + info &= XLOG_HEAP_OPMASK; + switch (info) + { + case XLOG_HEAP2_MULTI_INSERT: + /* this also handles the XLOG_HEAP_INIT_PAGE case */ + DecodeMultiInsert(cache, buf); + break; + default: + /* everything else here is just physical stuff were not interested in */ + break; + } + break; + } + + case RM_XACT_ID: + { + switch (info) + { + case XLOG_XACT_COMMIT: + { + TransactionId *sub_xids; + xl_xact_commit *xlrec = (xl_xact_commit*)buf->record_data; + + /* FIXME: this is not really allowed if there is no subtransactions */ + sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + DecodeCommit(cache, buf, r->xl_xid, sub_xids, xlrec->nsubxacts); + + break; + } + case XLOG_XACT_COMMIT_PREPARED: + { + TransactionId *sub_xids; + xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared*)buf->record_data; + + sub_xids = (TransactionId *) &(xlrec->crec.xnodes[xlrec->crec.nrels]); + + DecodeCommit(cache, buf, r->xl_xid, sub_xids, + xlrec->crec.nsubxacts); + + break; + } + case XLOG_XACT_COMMIT_COMPACT: + { + xl_xact_commit_compact *xlrec = (xl_xact_commit_compact*)buf->record_data; + DecodeCommit(cache, buf, r->xl_xid, xlrec->subxacts, + xlrec->nsubxacts); + break; + } + case XLOG_XACT_ABORT: + case XLOG_XACT_ABORT_PREPARED: + { + TransactionId *sub_xids; + xl_xact_abort *xlrec = (xl_xact_abort*)buf->record_data; + int i; + + /* FIXME: this is not really allowed if there is no subtransactions */ + sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + + for(i = 0; i < xlrec->nsubxacts; i++) + { + ApplyCacheAbort(cache, *sub_xids, buf->origptr); + sub_xids += 1; + } + + /* TODO: check that this also contains not-yet-aborted subtxns */ + ApplyCacheAbort(cache, r->xl_xid, buf->origptr); + + elog(WARNING, "ABORT %u", r->xl_xid); + break; + } + case XLOG_XACT_ASSIGNMENT: + /* + * XXX: We could reassign transactions to the parent here + * to save space and effort when merging transactions at + * commit. + */ + break; + case XLOG_XACT_PREPARE: + /* + * FXIME: we should replay the transaction and prepare it + * as well. + */ + break; + default: + break; + ; + } + break; + } + default: + break; + } +} + +static void +DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid, + TransactionId *sub_xids, int nsubxacts) +{ + int i; + + for (i = 0; i < nsubxacts; i++) + { + ApplyCacheCommitChild(cache, xid, *sub_xids, buf->origptr); + sub_xids++; + } + + /* replay actions of all transaction + subtransactions in order */ + ApplyCacheCommit(cache, xid, buf->origptr); +} + +static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf) +{ + XLogRecord* r = &buf->record; + xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data; + + Oid relfilenode = xlrec->target.node.relNode; + + ApplyCacheChange* change; + + if (r->xl_info & XLR_BKP_BLOCK_1 + && r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)) + { + elog(FATAL, "huh, no tuple data on wal_level = logical?"); + } + + if(relfilenode == 0) + { + elog(ERROR, "nailed catalog changed"); + } + + change = ApplyCacheGetChange(cache); + change->action = APPLY_CACHE_CHANGE_INSERT; + + /* + * Lookup the pg_class entry for the relfilenode to get the real oid + */ + { + MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext); + change->table = SearchSysCacheCopy1(RELFILENODE, + relfilenode); + MemoryContextSwitchTo(curctx); + } + + if (!HeapTupleIsValid(change->table)) + { +#ifdef SHOULD_BE_HANDLED_BETTER + elog(WARNING, "cache lookup failed for relfilenode %u, systable?", + relfilenode); +#endif + ApplyCacheReturnChange(cache, change); + return; + } + + if (HeapTupleGetOid(change->table) < FirstNormalObjectId) + { +#ifdef VERBOSE_DEBUG + elog(LOG, "skipping change to systable"); +#endif + ApplyCacheReturnChange(cache, change); + return; + } + +#ifdef VERBOSE_DEBUG + { + /*for accessing the cache */ + Form_pg_class class_form; + class_form = (Form_pg_class) GETSTRUCT(change->table); + elog(WARNING, "INSERT INTO \"%s\"", NameStr(class_form->relname)); + } +#endif + + change->newtuple = ApplyCacheGetTupleBuf(cache); + + DecodeXLogTuple((char*)xlrec + SizeOfHeapInsert, + r->xl_len - SizeOfHeapInsert, + change->table, change->newtuple); + + ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change); +} + +static void +DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf) +{ + XLogRecord* r = &buf->record; + xl_heap_update *xlrec = (xl_heap_update *) buf->record_data; + + Oid relfilenode = xlrec->target.node.relNode; + + ApplyCacheChange* change; + + if ((r->xl_info & XLR_BKP_BLOCK_1 || r->xl_info & XLR_BKP_BLOCK_2) && + (r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))) + { + elog(FATAL, "huh, no tuple data on wal_level = logical?"); + } + + change = ApplyCacheGetChange(cache); + change->action = APPLY_CACHE_CHANGE_UPDATE; + + /* + * Lookup the pg_class entry for the relfilenode to get the real oid + */ + { + MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext); + change->table = SearchSysCacheCopy1(RELFILENODE, + relfilenode); + MemoryContextSwitchTo(curctx); + } + + if (!HeapTupleIsValid(change->table)) + { +#ifdef SHOULD_BE_HANDLED_BETTER + elog(WARNING, "cache lookup failed for relfilenode %u, systable?", + relfilenode); +#endif + ApplyCacheReturnChange(cache, change); + return; + } + + if (HeapTupleGetOid(change->table) < FirstNormalObjectId) + { +#ifdef VERBOSE_DEBUG + elog(LOG, "skipping change to systable"); +#endif + ApplyCacheReturnChange(cache, change); + return; + } + +#ifdef VERBOSE_DEBUG + { + /*for accessing the cache */ + Form_pg_class class_form; + class_form = (Form_pg_class) GETSTRUCT(change->table); + elog(WARNING, "UPDATE \"%s\"", NameStr(class_form->relname)); + } +#endif + + /* FIXME: need to save the old tuple as well if we want primary key updates to work. */ + change->newtuple = ApplyCacheGetTupleBuf(cache); + + DecodeXLogTuple((char*)xlrec + SizeOfHeapUpdate, + r->xl_len - SizeOfHeapUpdate, + change->table, change->newtuple); + + ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change); +} + +static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf) +{ + XLogRecord* r = &buf->record; + + xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data; + + Oid relfilenode = xlrec->target.node.relNode; + + ApplyCacheChange* change; + + change = ApplyCacheGetChange(cache); + change->action = APPLY_CACHE_CHANGE_DELETE; + + if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader)) + { + elog(FATAL, "huh, no primary key for a delete on wal_level = logical?"); + } + + /* + * Lookup the pg_class entry for the relfilenode to get the real oid + */ + { + MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext); + change->table = SearchSysCacheCopy1(RELFILENODE, + relfilenode); + MemoryContextSwitchTo(curctx); + } + + if (!HeapTupleIsValid(change->table)) + { +#ifdef SHOULD_BE_HANDLED_BETTER + elog(WARNING, "cache lookup failed for relfilenode %u, systable?", + relfilenode); +#endif + ApplyCacheReturnChange(cache, change); + return; + } + + if (HeapTupleGetOid(change->table) < FirstNormalObjectId) + { +#ifdef VERBOSE_DEBUG + elog(LOG, "skipping change to systable"); +#endif + ApplyCacheReturnChange(cache, change); + return; + } + +#ifdef VERBOSE_DEBUG + { + /*for accessing the cache */ + Form_pg_class class_form; + class_form = (Form_pg_class) GETSTRUCT(change->table); + elog(WARNING, "DELETE FROM \"%s\"", NameStr(class_form->relname)); + } +#endif + + change->oldtuple = ApplyCacheGetTupleBuf(cache); + + DecodeXLogTuple((char*)xlrec + SizeOfHeapDelete, + r->xl_len - SizeOfHeapDelete, + change->table, change->oldtuple); + + ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change); +} + + +static void +DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf) +{ + elog(WARNING, "skipping XLOG_HEAP_NEWPAGE record because we are too dumb"); +} + +static void +DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf) +{ + elog(WARNING, "skipping XLOG_HEAP2_MULTI_INSERT record because we are too dumb"); +} + + +static void DecodeXLogTuple(char* data, Size len, + HeapTuple table, ApplyCacheTupleBuf* tuple) +{ + xl_heap_header xlhdr; + int datalen = len - SizeOfHeapHeader; + + Assert(datalen >= 0); + Assert(datalen <= MaxHeapTupleSize); + + tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits); + + /* not a disk based tuple */ + ItemPointerSetInvalid(&tuple->tuple.t_self); + + /* probably not needed, but ... (is it actually valid to set it?) */ + tuple->tuple.t_tableOid = HeapTupleGetOid(table); + tuple->tuple.t_data = &tuple->header; + + /* data is not stored aligned */ + memcpy((char *) &xlhdr, + data, + SizeOfHeapHeader); + + memset(&tuple->header, 0, sizeof(HeapTupleHeaderData)); + + memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits), + data + SizeOfHeapHeader, + datalen); + + tuple->header.t_infomask = xlhdr.t_infomask; + tuple->header.t_infomask2 = xlhdr.t_infomask2; + tuple->header.t_hoff = xlhdr.t_hoff; +} diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h new file mode 100644 index 0000000..53088e2 --- /dev/null +++ b/src/include/replication/decode.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * decode.h + * PostgreSQL WAL to logical transformation + * + * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef DECODE_H +#define DECODE_H + +#include "access/xlogreader.h" +#include "replication/applycache.h" + +void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf); + +typedef struct ReaderApplyState +{ + ApplyCache *apply_cache; +} ReaderApplyState; + +#endif -- 1.7.10.rc3.3.g19a6c.dirty -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers