From: Andres Freund <and...@anarazel.de>

The individual changes need to be identified by an xid. The xid can be a
subtransaction or a toplevel one, at commit those can be reintegrated by doing
a k-way mergesort between the individual transaction.

Callbacks for apply_begin, apply_change and apply_commit are provided to
retrieve complete transactions.

Missing:
- spill-to-disk
- correct subtransaction merge, current behaviour is simple/wrong
- DDL handling (?)
- resource usage controls
---
 src/backend/replication/Makefile             |    2 +
 src/backend/replication/logical/Makefile     |   19 ++
 src/backend/replication/logical/applycache.c |  380 ++++++++++++++++++++++++++
 src/include/replication/applycache.h         |  185 +++++++++++++
 4 files changed, 586 insertions(+)
 create mode 100644 src/backend/replication/logical/Makefile
 create mode 100644 src/backend/replication/logical/applycache.c
 create mode 100644 src/include/replication/applycache.h

diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 9d9ec87..ae7f6b1 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
        repl_gram.o syncrep.o
 
+SUBDIRS = logical
+
 include $(top_srcdir)/src/backend/common.mk
 
 # repl_scanner is compiled as part of repl_gram
diff --git a/src/backend/replication/logical/Makefile 
b/src/backend/replication/logical/Makefile
new file mode 100644
index 0000000..2eadab8
--- /dev/null
+++ b/src/backend/replication/logical/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/replication/logical
+#
+# IDENTIFICATION
+#    src/backend/replication/logical/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/logical
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
+
+OBJS = applycache.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/applycache.c 
b/src/backend/replication/logical/applycache.c
new file mode 100644
index 0000000..b73b0ba
--- /dev/null
+++ b/src/backend/replication/logical/applycache.c
@@ -0,0 +1,380 @@
+/*-------------------------------------------------------------------------
+ *
+ * applycache.c
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/applycache.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/xact.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+#include "replication/applycache.h"
+
+#include "utils/ilist.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+
+const Size max_memtries = 1<<16;
+
+const size_t max_cached_changes = 1024;
+const size_t max_cached_tuplebufs = 1024; /* ~8MB */
+const size_t max_cached_transactions = 512;
+
+typedef struct ApplyCacheTXNByIdEnt
+{
+       TransactionId xid;
+       ApplyCacheTXN* txn;
+} ApplyCacheTXNByIdEnt;
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache);
+static void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn);
+
+static ApplyCacheTXN* ApplyCacheTXNByXid(ApplyCache*, TransactionId xid, bool 
create);
+
+
+ApplyCache*
+ApplyCacheAllocate(void)
+{
+       ApplyCache* cache = (ApplyCache*)malloc(sizeof(ApplyCache));
+       HASHCTL         hash_ctl;
+
+       if (!cache)
+               elog(ERROR, "Could not allocate the ApplyCache");
+
+       memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+       cache->context = AllocSetContextCreate(TopMemoryContext,
+                                              "ApplyCache",
+                                              ALLOCSET_DEFAULT_MINSIZE,
+                                              ALLOCSET_DEFAULT_INITSIZE,
+                                              ALLOCSET_DEFAULT_MAXSIZE);
+
+       hash_ctl.keysize = sizeof(TransactionId);
+       hash_ctl.entrysize = sizeof(ApplyCacheTXNByIdEnt);
+       hash_ctl.hash = tag_hash;
+       hash_ctl.hcxt = cache->context;
+
+       cache->by_txn = hash_create("ApplyCacheByXid", 1000, &hash_ctl,
+                                   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+       cache->nr_cached_transactions = 0;
+       cache->nr_cached_changes = 0;
+       cache->nr_cached_tuplebufs = 0;
+
+       ilist_d_init(&cache->cached_transactions);
+       ilist_d_init(&cache->cached_changes);
+       ilist_s_init(&cache->cached_tuplebufs);
+
+       return cache;
+}
+
+void ApplyCacheFree(ApplyCache* cache)
+{
+       /* FIXME: check for in-progress transactions */
+       /* FIXME: clean up cached transaction */
+       /* FIXME: clean up cached changes */
+       /* FIXME: clean up cached tuplebufs */
+       hash_destroy(cache->by_txn);
+       free(cache);
+}
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache)
+{
+       ApplyCacheTXN* txn;
+
+       if (cache->nr_cached_transactions)
+       {
+               cache->nr_cached_transactions--;
+               txn = ilist_container(ApplyCacheTXN, node,
+                                     
ilist_d_pop_front(&cache->cached_transactions));
+       }
+       else
+       {
+               txn = (ApplyCacheTXN*)
+                       malloc(sizeof(ApplyCacheTXN));
+
+               if (!txn)
+                       elog(ERROR, "Could not allocate a ApplyCacheTXN 
struct");
+       }
+
+       memset(txn, 0, sizeof(ApplyCacheTXN));
+       ilist_d_init(&txn->changes);
+       ilist_d_init(&txn->subtxns);
+       return txn;
+}
+
+void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn)
+{
+       if(cache->nr_cached_transactions < max_cached_transactions){
+               cache->nr_cached_transactions++;
+               ilist_d_push_front(&cache->cached_transactions, &txn->node);
+       }
+       else{
+               free(txn);
+       }
+}
+
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache* cache)
+{
+       ApplyCacheChange* change;
+
+       if (cache->nr_cached_changes)
+       {
+               cache->nr_cached_changes--;
+               change = ilist_container(ApplyCacheChange, node,
+                                        
ilist_d_pop_front(&cache->cached_changes));
+       }
+       else
+       {
+               change = (ApplyCacheChange*)malloc(sizeof(ApplyCacheChange));
+
+               if (!change)
+                       elog(ERROR, "Could not allocate a ApplyCacheChange 
struct");
+       }
+
+
+       memset(change, 0, sizeof(ApplyCacheChange));
+       return change;
+}
+
+void
+ApplyCacheReturnChange(ApplyCache* cache, ApplyCacheChange* change)
+{
+       if (change->newtuple)
+               ApplyCacheReturnTupleBuf(cache, change->newtuple);
+       if (change->oldtuple)
+               ApplyCacheReturnTupleBuf(cache, change->oldtuple);
+
+       if (change->table)
+               heap_freetuple(change->table);
+
+       if(cache->nr_cached_changes < max_cached_changes){
+               cache->nr_cached_changes++;
+               ilist_d_push_front(&cache->cached_changes, &change->node);
+       }
+       else{
+               free(change);
+       }
+}
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache* cache)
+{
+       ApplyCacheTupleBuf* tuple;
+
+       if (cache->nr_cached_tuplebufs)
+       {
+               cache->nr_cached_tuplebufs--;
+               tuple = ilist_container(ApplyCacheTupleBuf, node,
+                                       
ilist_s_pop_front(&cache->cached_tuplebufs));
+       }
+       else
+       {
+               tuple =
+                       (ApplyCacheTupleBuf*)malloc(sizeof(ApplyCacheTupleBuf));
+
+               if (!tuple)
+                       elog(ERROR, "Could not allocate a ApplyCacheTupleBuf 
struct");
+       }
+
+       return tuple;
+}
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple)
+{
+       if(cache->nr_cached_tuplebufs < max_cached_tuplebufs){
+               cache->nr_cached_tuplebufs++;
+               ilist_s_push_front(&cache->cached_tuplebufs, &tuple->node);
+       }
+       else{
+               free(tuple);
+       }
+}
+
+
+static
+ApplyCacheTXN*
+ApplyCacheTXNByXid(ApplyCache* cache, TransactionId xid, bool create)
+{
+       ApplyCacheTXNByIdEnt* ent;
+       bool found;
+
+       ent = (ApplyCacheTXNByIdEnt*)
+               hash_search(cache->by_txn,
+                           (void *)&xid,
+                           (create ? HASH_ENTER : HASH_FIND),
+                           &found);
+
+       if (found)
+       {
+#ifdef VERBOSE_DEBUG
+               elog(LOG, "found cache entry for %u at %p", xid, ent);
+#endif
+       }
+       else
+       {
+#ifdef VERBOSE_DEBUG
+               elog(LOG, "didn't find cache entry for %u in %p at %p, creating 
%u",
+                    xid, cache, ent, create);
+#endif
+       }
+
+       if (!found && !create)
+               return NULL;
+
+       if (!found)
+       {
+               ent->txn = ApplyCacheGetTXN(cache);
+       }
+
+       return ent->txn;
+}
+
+void
+ApplyCacheAddChange(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn,
+                    ApplyCacheChange* change)
+{
+       ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, true);
+       txn->lsn = lsn;
+       ilist_d_push_back(&txn->changes, &change->node);
+}
+
+
+void
+ApplyCacheCommitChild(ApplyCache* cache, TransactionId xid,
+                      TransactionId subxid, XLogRecPtr lsn)
+{
+       ApplyCacheTXN* txn;
+       ApplyCacheTXN* subtxn;
+
+       subtxn = ApplyCacheTXNByXid(cache, subxid, false);
+
+       /*
+        * No need to do anything if that subtxn didn't contain any changes
+        */
+       if (!subtxn)
+               return;
+
+       subtxn->lsn = lsn;
+
+       txn = ApplyCacheTXNByXid(cache, xid, true);
+
+       ilist_d_push_back(&txn->subtxns, &subtxn->node);
+}
+
+void
+ApplyCacheCommit(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+       ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+       ilist_d_node* cur_change, *next_change;
+       ilist_d_node* cur_txn, *next_txn;
+       bool found;
+
+       if (!txn)
+               return;
+
+       txn->lsn = lsn;
+
+       cache->begin(cache, txn);
+
+       /*
+        * FIXME:
+        * do a k-way mergesort of all changes ordered by xid
+        *
+        * For now we just iterate through all subtransactions and then through 
the
+        * main txn. But thats *WRONG*.
+        *
+        * The best way to do is probably to model the current heads of all 
TXNs as
+        * a heap and always remove from the smallest lsn till thats not the 
case
+        * anymore.
+        */
+       ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+       {
+               ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, 
cur_txn);
+
+               ilist_d_foreach_modify (cur_change, next_change, 
&subtxn->changes)
+               {
+                       ApplyCacheChange* change =
+                               ilist_container(ApplyCacheChange, node, 
cur_change);
+                       cache->apply_change(cache, txn, subtxn, change);
+
+                       ApplyCacheReturnChange(cache, change);
+               }
+               ApplyCacheReturnTXN(cache, subtxn);
+       }
+
+       ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+       {
+               ApplyCacheChange* change =
+                       ilist_container(ApplyCacheChange, node, cur_change);
+               cache->apply_change(cache, txn, NULL, change);
+
+               ApplyCacheReturnChange(cache, change);
+       }
+
+       cache->commit(cache, txn);
+
+       /* now remove reference from cache */
+       hash_search(cache->by_txn,
+                   (void *)&xid,
+                   HASH_REMOVE,
+                   &found);
+       Assert(found);
+
+       ApplyCacheReturnTXN(cache, txn);
+}
+
+void
+ApplyCacheAbort(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+       ilist_d_node* cur_change, *next_change;
+       ilist_d_node* cur_txn, *next_txn;
+       ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+       bool found;
+
+       /* no changes in this commit */
+       if (!txn)
+               return;
+
+       /* iterate through all subtransactions and free memory */
+       ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+       {
+               ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, 
cur_txn);
+               ilist_d_foreach_modify (cur_change, next_change, 
&subtxn->changes)
+               {
+                       ApplyCacheChange* change =
+                               ilist_container(ApplyCacheChange, node, 
cur_change);
+                       ApplyCacheReturnChange(cache, change);
+               }
+               ApplyCacheReturnTXN(cache, subtxn);
+       }
+
+       ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+       {
+               ApplyCacheChange* change =
+                       ilist_container(ApplyCacheChange, node, cur_change);
+               ApplyCacheReturnChange(cache, change);
+       }
+
+       /* now remove reference from cache */
+       hash_search(cache->by_txn,
+                   (void *)&xid,
+                   HASH_REMOVE,
+                   &found);
+       Assert(found);
+
+       ApplyCacheReturnTXN(cache, txn);
+}
diff --git a/src/include/replication/applycache.h 
b/src/include/replication/applycache.h
new file mode 100644
index 0000000..4ceba63
--- /dev/null
+++ b/src/include/replication/applycache.h
@@ -0,0 +1,185 @@
+/*
+ * applycache.h
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/applycache.h
+ */
+#ifndef APPLYCACHE_H
+#define APPLYCACHE_H
+
+#include "access/htup.h"
+#include "utils/hsearch.h"
+#include "utils/ilist.h"
+
+typedef struct ApplyCache ApplyCache;
+
+enum ApplyCacheChangeType
+{
+       APPLY_CACHE_CHANGE_INSERT,
+       APPLY_CACHE_CHANGE_UPDATE,
+       APPLY_CACHE_CHANGE_DELETE
+};
+
+typedef struct ApplyCacheTupleBuf
+{
+       /* position in preallocated list */
+       ilist_s_node node;
+
+       HeapTupleData tuple;
+       HeapTupleHeaderData header;
+       char data[MaxHeapTupleSize];
+} ApplyCacheTupleBuf;
+
+typedef struct ApplyCacheChange
+{
+       XLogRecPtr lsn;
+       enum ApplyCacheChangeType action;
+
+       ApplyCacheTupleBuf* newtuple;
+
+       ApplyCacheTupleBuf* oldtuple;
+
+       HeapTuple table;
+
+       /*
+        * While in use this is how a change is linked into a transactions,
+        * otherwise its the preallocated list.
+       */
+       ilist_d_node node;
+} ApplyCacheChange;
+
+typedef struct ApplyCacheTXN
+{
+       TransactionId xid;
+
+       XLogRecPtr lsn;
+
+       /*
+        * How many ApplyCacheChange's do we have in this txn.
+        *
+        * Subtransactions are *not* included.
+        */
+       Size nentries;
+
+       /*
+        * How many of the above entries are stored in memory in contrast to 
being
+        * spilled to disk.
+        */
+       Size nentries_mem;
+
+       /*
+        * List of actual changes
+        */
+       ilist_d_head changes;
+
+       /*
+        * non-hierarchical list of subtransactions that are *not* aborted
+        */
+       ilist_d_head subtxns;
+
+       /*
+        * our position in a list of subtransactions while the TXN is in
+        * use. Otherwise its the position in the list of preallocated
+        * transactions.
+        */
+       ilist_d_node node;
+} ApplyCacheTXN;
+
+
+/* XXX: were currently passing the originating subtxn. Not sure thats 
necessary */
+typedef void (*ApplyCacheApplyChangeCB)(ApplyCache* cache, ApplyCacheTXN* txn, 
ApplyCacheTXN* subtxn, ApplyCacheChange* change);
+typedef void (*ApplyCacheBeginCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+typedef void (*ApplyCacheCommitCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+
+/*
+ * max number of concurrent top-level transactions or transaction where we
+ * don't know if they are top-level can be calculated by:
+ * (max_connections + max_prepared_xactx + ?)  * PGPROC_MAX_CACHED_SUBXIDS
+ */
+struct ApplyCache
+{
+       TransactionId last_txn;
+       ApplyCacheTXN *last_txn_cache;
+       HTAB *by_txn;
+
+       ApplyCacheBeginCB begin;
+       ApplyCacheApplyChangeCB apply_change;
+       ApplyCacheCommitCB commit;
+
+       void* private_data;
+
+       MemoryContext context;
+
+       /*
+        * we don't want to repeatedly (de-)allocated those structs, so cache 
them for reusage.
+        */
+       ilist_d_head cached_transactions;
+       size_t nr_cached_transactions;
+
+       ilist_d_head cached_changes;
+       size_t nr_cached_changes;
+
+       ilist_s_head cached_tuplebufs;
+       size_t nr_cached_tuplebufs;
+};
+
+
+ApplyCache*
+ApplyCacheAllocate(void);
+
+void
+ApplyCacheFree(ApplyCache*);
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache*);
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple);
+
+/*
+ * Returns a (potentically preallocated) change struct. Its lifetime is managed
+ * by the applycache module.
+ *
+ * If not added to a transaction with ApplyCacheAddChange it needs to be
+ * returned via ApplyCacheReturnChange
+ *
+ * FIXME: better name
+ */
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache*);
+
+/*
+ * Return an unused ApplyCacheChange struct
+ */
+void
+ApplyCacheReturnChange(ApplyCache*, ApplyCacheChange*);
+
+
+/*
+ * record the transaction as in-progress if not already done, add the current
+ * change.
+ *
+ * We have a one-entry cache for lookin up the current ApplyCacheTXN so we
+ * don't need to do a full hash-lookup if the same xid is used
+ * sequentially. Them being used multiple times that way is rather frequent.
+ */
+void
+ApplyCacheAddChange(ApplyCache*, TransactionId, XLogRecPtr lsn, 
ApplyCacheChange*);
+
+/*
+ *
+ */
+void
+ApplyCacheCommit(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheCommitChild(ApplyCache*, TransactionId, TransactionId, XLogRecPtr 
lsn);
+
+void
+ApplyCacheAbort(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to