---
contrib/Makefile | 1 +
contrib/test_decoding/Makefile | 16 +++
contrib/test_decoding/test_decoding.c | 192 ++++++++++++++++++++++++++++++++++
3 files changed, 209 insertions(+)
create mode 100644 contrib/test_decoding/Makefile
create mode 100644 contrib/test_decoding/test_decoding.c
diff --git a/contrib/Makefile b/contrib/Makefile
index d230451..8709992 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -48,6 +48,7 @@ SUBDIRS = \
tablefunc \
tcn \
test_parser \
+ test_decoding \
tsearch2 \
unaccent \
vacuumlo
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
new file mode 100644
index 0000000..2ac9653
--- /dev/null
+++ b/contrib/test_decoding/Makefile
@@ -0,0 +1,16 @@
+# contrib/test_decoding/Makefile
+
+MODULE_big = test_decoding
+OBJS = test_decoding.o
+
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_decoding
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
new file mode 100644
index 0000000..f3d90e3
--- /dev/null
+++ b/contrib/test_decoding/test_decoding.c
@@ -0,0 +1,192 @@
+/*-------------------------------------------------------------------------
+ *
+ * test_deocding.c
+ * example output plugin for the logical replication functionality
+ *
+ * Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/test_decoding/test_decoding.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_class.h"
+#include "catalog/pg_type.h"
+#include "catalog/index.h"
+
+#include "replication/output_plugin.h"
+#include "replication/snapbuild.h"
+
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+
+void WalSndWriteData(XLogRecPtr lsn, const char *data, Size len);
+
+extern void pg_decode_init(void **private);
+
+extern bool pg_decode_begin_txn(void *private, StringInfo out, ReorderBufferTXN* txn);
+extern bool pg_decode_commit_txn(void *private, StringInfo out, ReorderBufferTXN* txn, XLogRecPtr commit_lsn);
+extern bool pg_decode_change(void *private, StringInfo out, ReorderBufferTXN* txn, Oid tableoid, ReorderBufferChange *change);
+
+
+void
+_PG_init(void)
+{
+}
+
+void
+pg_decode_init(void **private)
+{
+ AssertVariableIsOfType(&pg_decode_init, LogicalDecodeInitCB);
+ *private = AllocSetContextCreate(TopMemoryContext,
+ "text conversion context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+}
+
+bool
+pg_decode_begin_txn(void *private, StringInfo out, ReorderBufferTXN* txn)
+{
+ AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
+
+ appendStringInfo(out, "BEGIN %d", txn->xid);
+ return true;
+}
+
+bool
+pg_decode_commit_txn(void *private, StringInfo out, ReorderBufferTXN* txn, XLogRecPtr commit_lsn)
+{
+ AssertVariableIsOfType(&pg_decode_commit_txn, LogicalDecodeCommitCB);
+
+ appendStringInfo(out, "COMMIT %d", txn->xid);
+ return true;
+}
+
+static void
+tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple)
+{
+ int i;
+ HeapTuple typeTuple;
+ Form_pg_type pt;
+
+ for (i = 0; i < tupdesc->natts; i++)
+ {
+ Oid typid, typoutput;
+ bool typisvarlena;
+ Datum origval, val;
+ char *outputstr;
+ bool isnull;
+ if (tupdesc->attrs[i]->attisdropped)
+ continue;
+ if (tupdesc->attrs[i]->attnum < 0)
+ continue;
+
+ typid = tupdesc->attrs[i]->atttypid;
+
+ typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+ if (!HeapTupleIsValid(typeTuple))
+ elog(ERROR, "cache lookup failed for type %u", typid);
+ pt = (Form_pg_type) GETSTRUCT(typeTuple);
+
+ appendStringInfoChar(s, ' ');
+ appendStringInfoString(s, NameStr(tupdesc->attrs[i]->attname));
+ appendStringInfoChar(s, '[');
+ appendStringInfoString(s, NameStr(pt->typname));
+ appendStringInfoChar(s, ']');
+
+ getTypeOutputInfo(typid,
+ &typoutput, &typisvarlena);
+
+ ReleaseSysCache(typeTuple);
+
+ origval = fastgetattr(tuple, i + 1, tupdesc, &isnull);
+
+ if (typisvarlena && !isnull)
+ val = PointerGetDatum(PG_DETOAST_DATUM(origval));
+ else
+ val = origval;
+
+ if (isnull)
+ outputstr = "(null)";
+ else
+ outputstr = OidOutputFunctionCall(typoutput, val);
+
+ appendStringInfoChar(s, ':');
+ appendStringInfoString(s, outputstr);
+ }
+}
+
+/* This is is just for demonstration, don't ever use this code for anything real! */
+bool
+pg_decode_change(void *private, StringInfo out, ReorderBufferTXN* txn,
+ Oid tableoid, ReorderBufferChange *change)
+{
+ Relation relation = RelationIdGetRelation(tableoid);
+ Form_pg_class class_form = RelationGetForm(relation);
+ TupleDesc tupdesc = RelationGetDescr(relation);
+ MemoryContext context = (MemoryContext)private;
+ MemoryContext old = MemoryContextSwitchTo(context);
+
+ AssertVariableIsOfType(&pg_decode_change, LogicalDecodeChangeCB);
+
+ appendStringInfoString(out, "table \"");
+ appendStringInfoString(out, NameStr(class_form->relname));
+ appendStringInfoString(out, "\":");
+
+ switch (change->action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ appendStringInfoString(out, " INSERT:");
+ tuple_to_stringinfo(out, tupdesc, &change->newtuple->tuple);
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ appendStringInfoString(out, " UPDATE:");
+ tuple_to_stringinfo(out, tupdesc, &change->newtuple->tuple);
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ {
+ Oid indexoid = InvalidOid;
+ Relation indexrel;
+ TupleDesc indexdesc;
+
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+
+ appendStringInfoString(out, " DELETE (pkey):");
+
+ relationFindPrimaryKey(relation, &indexoid, &pknratts,
+ pkattnum, pktypoid, pkopclass);
+ indexrel = RelationIdGetRelation(indexoid);
+
+ indexdesc = RelationGetDescr(indexrel);
+
+ tuple_to_stringinfo(out, indexdesc, &change->oldtuple->tuple);
+
+ RelationClose(indexrel);
+ break;
+ }
+ }
+ RelationClose(relation);
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(context);
+ return true;
+}
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers