Hello,
we've made few helper functions for making logical replication easier, I
bundled it into contrib module as this is mainly for discussion at this
time (I don't expect this to get committed any time soon, but it is good
way to iron out protocol, etc).
I created sample logical decoding plugin that uses those functions and
which can be used for passing DML changes in platform/version
independent (hopefully) format.
I will post sample apply BG worker also once I get some initial feedback
about this.
It's hard to write tests for this as the binary changes contain
transaction ids and timestamps so the data changes constantly.
This is of course based on the BDR work Andres, Craig and myself have
been doing.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/contrib/lrep/Makefile b/contrib/lrep/Makefile
new file mode 100644
index 0000000..d3f7ba3
--- /dev/null
+++ b/contrib/lrep/Makefile
@@ -0,0 +1,20 @@
+# contrib/lrep/Makefile
+
+MODULE_big = lrep
+OBJS = lrep_utils.o lrep_output.o $(WIN32RES)
+PG_CPPFLAGS = -I$(libpq_srcdir)
+
+#EXTENSION = lrep
+#REGRESS = lrep
+#REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/lrep/logical.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/lrep
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/lrep/lrep.h b/contrib/lrep/lrep.h
new file mode 100644
index 0000000..e2910c4
--- /dev/null
+++ b/contrib/lrep/lrep.h
@@ -0,0 +1,108 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep.h
+ * LREP public interfaces
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/lrep/lrep.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LREP_H
+#define LREP_H
+
+#include "libpq-fe.h"
+
+#include "nodes/parsenodes.h"
+
+#include "replication/logical.h"
+#include "replication/output_plugin.h"
+
+#include "storage/lock.h"
+
+#define LREP_PROTO_VERSION_NUM 1
+#define LREP_PROTO_MIN_REMOTE_VERSION_NUM 1
+
+typedef struct
+{
+ MemoryContext context;
+
+ bool allow_binary_protocol;
+ bool allow_sendrecv_protocol;
+ bool int_datetime_mismatch;
+ bool forward_changesets;
+
+ uint32 client_pg_version;
+ uint32 client_pg_catversion;
+ uint32 client_proto_version;
+ uint32 client_min_proto_version;
+ size_t client_sizeof_int;
+ size_t client_sizeof_long;
+ size_t client_sizeof_datum;
+ size_t client_maxalign;
+ bool client_bigendian;
+ bool client_float4_byval;
+ bool client_float8_byval;
+ bool client_int_datetime;
+ char *client_db_encoding;
+
+ void *extra; /* Additional data */
+} LREPOutputData;
+
+
+typedef struct LREPTupleData
+{
+ Datum values[MaxTupleAttributeNumber];
+ bool nulls[MaxTupleAttributeNumber];
+ bool changed[MaxTupleAttributeNumber];
+} LREPTupleData;
+
+
+extern void lrep_write_begin(StringInfo out, ReorderBufferTXN *txn, int flags,
+ StringInfo extradata);
+extern void lrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn, int flags,
+ StringInfo extradata);
+
+extern void lrep_write_insert(LREPOutputData *data, StringInfo out,
+ Relation rel, HeapTuple newtuple);
+extern void lrep_write_update(LREPOutputData *data, StringInfo out,
+ Relation rel, HeapTuple oldtuple,
+ HeapTuple newtuple);
+extern void lrep_write_delete(LREPOutputData *data, StringInfo out,
+ Relation rel, HeapTuple oldtuple);
+
+extern void lrep_write_rel(StringInfo out, Relation rel);
+extern void lrep_write_tuple(LREPOutputData *data, StringInfo out, Relation rel,
+ HeapTuple tuple);
+
+extern int lrep_read_begin(StringInfo in, XLogRecPtr *origlsn,
+ TimestampTz *committime, TransactionId *remote_xid);
+extern int lrep_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
+ XLogRecPtr *end_lsn, TimestampTz *committime);
+extern Relation lrep_read_insert(StringInfo in, LOCKMODE lockmode,
+ LREPTupleData *tuple);
+extern Relation lrep_read_update(StringInfo in, LOCKMODE lockmode,
+ LREPTupleData *oldtuple,
+ LREPTupleData *newtuple, bool *pkeysent);
+extern Relation lrep_read_delete(StringInfo in, LOCKMODE lockmode,
+ LREPTupleData *tuple, bool *pkeysent);
+
+extern void lrep_read_tuple_parts(StringInfo s, TupleDesc desc,
+ LREPTupleData *tuple);
+extern RangeVar *lrep_read_rel(StringInfo s);
+
+extern bool lrep_send_feedback(PGconn *conn, XLogRecPtr recvpos,
+ XLogRecPtr writepos, XLogRecPtr flushpos,
+ int64 now, bool force);
+
+extern void lrep_opt_parse_notnull(DefElem *elem, const char *paramtype);
+extern void lrep_opt_parse_uint32(DefElem *elem, uint32 *res);
+extern void lrep_opt_parse_size_t(DefElem *elem, size_t *res);
+extern void lrep_opt_parse_bool(DefElem *elem, bool *res);
+extern void lrep_opt_parse_identifier_list_arr(DefElem *elem, char ***list, int *len);
+extern void lrep_opt_required_error(const char *param);
+
+#endif /* LREP */
diff --git a/contrib/lrep/lrep_output.c b/contrib/lrep/lrep_output.c
new file mode 100644
index 0000000..7c4b614
--- /dev/null
+++ b/contrib/lrep/lrep_output.c
@@ -0,0 +1,430 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep_output.c
+ * LREP output plugin
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/lrep/lrep_output.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "lrep_output.h"
+
+#include "access/sysattr.h"
+#include "access/tuptoaster.h"
+#include "access/xact.h"
+
+#include "catalog/catversion.h"
+#include "catalog/index.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "libpq/pqformat.h"
+
+#include "mb/pg_wchar.h"
+
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/typcache.h"
+
+PG_MODULE_MAGIC;
+
+extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
+
+static LREPOutputCallbackData *LREPOutputCallbacks = NULL;
+
+#define startup_callback(ctx, data, options) \
+ if (LREPOutputCallbacks && LREPOutputCallbacks->startup_callback) \
+ LREPOutputCallbacks->startup_callback(ctx, data, options)
+
+#define shutdown_callback(ctx) \
+ if (LREPOutputCallbacks && LREPOutputCallbacks->shutdown_callback) \
+ LREPOutputCallbacks->shutdown_callback(ctx)
+
+#define should_forward_changeset(ctx, data, txn) \
+ (!LREPOutputCallbacks || \
+ !LREPOutputCallbacks->should_forward_changeset || \
+ LREPOutputCallbacks->should_forward_changeset(ctx, data, txn))
+
+#define should_forward_change(ctx, data, relation, action) \
+ (!LREPOutputCallbacks || \
+ !LREPOutputCallbacks->should_forward_change || \
+ LREPOutputCallbacks->should_forward_change(ctx, data, relation, action))
+
+
+#ifdef USE_FLOAT4_BYVAL
+ static bool server_float4byval = true;
+#else
+ static bool server_float4byval = false;
+#endif
+
+#ifdef USE_FLOAT8_BYVAL
+ static bool server_float8byval = true;
+#else
+ static bool server_float8byval = false;
+#endif
+
+#ifdef USE_INTEGER_DATETIMES
+ static bool server_int_datetime = true;
+#else
+ static bool server_int_datetime = false;
+#endif
+
+#ifdef server_bigendian
+ static bool server_bigendian = true;
+#else
+ static bool server_bigendian = false;
+#endif
+
+
+/* These must be available to pg_dlsym() */
+static void pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
+ bool is_init);
+static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_change(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, Relation rel,
+ ReorderBufferChange *change);
+static void pg_decode_shutdown(LogicalDecodingContext * ctx);
+
+void
+LREP_output_plugin_init(OutputPluginCallbacks *pgcb, LREPOutputCallbackData *plugincb)
+{
+ pgcb->startup_cb = pg_decode_startup;
+ pgcb->begin_cb = pg_decode_begin_txn;
+ pgcb->change_cb = pg_decode_change;
+ pgcb->commit_cb = pg_decode_commit_txn;
+ pgcb->shutdown_cb = pg_decode_shutdown;
+
+ LREPOutputCallbacks = plugincb;
+}
+
+/* specify output plugin callbacks */
+void
+_PG_output_plugin_init(OutputPluginCallbacks *cb)
+{
+ AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
+
+ LREP_output_plugin_init(cb, NULL);
+}
+
+/* initialize this plugin */
+static void
+pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt, bool is_init)
+{
+ List *additional_options = NIL;
+ ListCell *option;
+ LREPOutputData *data;
+
+ data = palloc0(sizeof(LREPOutputData));
+ data->context = AllocSetContextCreate(TopMemoryContext,
+ "lrep output plugin data context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ ctx->output_plugin_private = data;
+
+ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+
+ /* parse options passed in by the client */
+
+ foreach(option, ctx->output_plugin_options)
+ {
+ DefElem *elem = lfirst(option);
+
+ Assert(elem->arg == NULL || IsA(elem->arg, String));
+
+ if (strcmp(elem->defname, "pg_version") == 0)
+ lrep_opt_parse_uint32(elem, &data->client_pg_version);
+ else if (strcmp(elem->defname, "pg_catversion") == 0)
+ lrep_opt_parse_uint32(elem, &data->client_pg_catversion);
+ else if (strcmp(elem->defname, "proto_version") == 0)
+ lrep_opt_parse_uint32(elem, &data->client_proto_version);
+ else if (strcmp(elem->defname, "min_proto_version") == 0)
+ lrep_opt_parse_uint32(elem, &data->client_min_proto_version);
+ else if (strcmp(elem->defname, "sizeof_int") == 0)
+ lrep_opt_parse_size_t(elem, &data->client_sizeof_int);
+ else if (strcmp(elem->defname, "sizeof_long") == 0)
+ lrep_opt_parse_size_t(elem, &data->client_sizeof_long);
+ else if (strcmp(elem->defname, "sizeof_datum") == 0)
+ lrep_opt_parse_size_t(elem, &data->client_sizeof_datum);
+ else if (strcmp(elem->defname, "maxalign") == 0)
+ lrep_opt_parse_size_t(elem, &data->client_maxalign);
+ else if (strcmp(elem->defname, "bigendian") == 0)
+ lrep_opt_parse_bool(elem, &data->client_bigendian);
+ else if (strcmp(elem->defname, "float4_byval") == 0)
+ lrep_opt_parse_bool(elem, &data->client_float4_byval);
+ else if (strcmp(elem->defname, "float8_byval") == 0)
+ lrep_opt_parse_bool(elem, &data->client_float8_byval);
+ else if (strcmp(elem->defname, "integer_datetimes") == 0)
+ lrep_opt_parse_bool(elem, &data->client_int_datetime);
+ else if (strcmp(elem->defname, "db_encoding") == 0)
+ data->client_db_encoding = pstrdup(strVal(elem->arg));
+ else if (strcmp(elem->defname, "interactive") == 0)
+ {
+ /*
+ * Set defaults for interactive mode
+ *
+ * This is used for examining the replication queue from SQL.
+ */
+ data->client_pg_version = PG_VERSION_NUM;
+ data->client_pg_catversion = CATALOG_VERSION_NO;
+ data->client_proto_version = LREP_PROTO_VERSION_NUM;
+ data->client_min_proto_version = LREP_PROTO_VERSION_NUM;
+ data->client_sizeof_int = sizeof(int);
+ data->client_sizeof_long = sizeof(long);
+ data->client_sizeof_datum = sizeof(Datum);
+ data->client_maxalign = MAXIMUM_ALIGNOF;
+ data->client_bigendian = server_bigendian;
+ data->client_float4_byval = server_float4byval;
+ data->client_float8_byval = server_float8byval;
+ data->client_int_datetime = server_int_datetime;
+ data->client_db_encoding = pstrdup(GetDatabaseEncodingName());
+ }
+ else
+ {
+ additional_options = lappend(additional_options, elem);
+ }
+ }
+
+ /* no options are passed in during initialization, so don't complain there */
+ if (!is_init)
+ {
+ if (data->client_pg_version == 0)
+ lrep_opt_required_error("pg_version");
+ if (data->client_pg_catversion == 0)
+ lrep_opt_required_error("pg_catversion");
+ if (data->client_proto_version == 0)
+ lrep_opt_required_error("proto_version");
+ if (data->client_min_proto_version == 0)
+ lrep_opt_required_error("min_proto_version");
+ if (data->client_sizeof_int == 0)
+ lrep_opt_required_error("sizeof_int");
+ if (data->client_sizeof_long == 0)
+ lrep_opt_required_error("sizeof_long");
+ if (data->client_sizeof_datum == 0)
+ lrep_opt_required_error("sizeof_datum");
+ if (data->client_maxalign == 0)
+ lrep_opt_required_error("maxalign");
+ if (data->client_db_encoding == NULL)
+ lrep_opt_required_error("db_encoding");
+
+ /* check incompatibilities we cannot work around */
+ if (strcmp(data->client_db_encoding, GetDatabaseEncodingName()) != 0)
+ elog(ERROR, "mismatching encodings are not yet supported");
+
+ if (data->client_min_proto_version > LREP_PROTO_VERSION_NUM)
+ elog(ERROR, "incompatible lrep client and server versions, server too old");
+ if (data->client_proto_version < LREP_PROTO_MIN_REMOTE_VERSION_NUM)
+ elog(ERROR, "incompatible lrep client and server versions, client too old");
+
+ data->allow_binary_protocol = true;
+ data->allow_sendrecv_protocol = true;
+
+ /*
+ * Now use the passed in information to determine how to encode the
+ * data sent by the output plugin. We don't make datatype specific
+ * decisions here, just generic decisions about using binary and/or
+ * send/recv protocols.
+ */
+
+ /*
+ * Don't use the binary protocol if there are fundamental arch
+ * differences.
+ */
+ if (data->client_sizeof_int != sizeof(int) ||
+ data->client_sizeof_long != sizeof(long) ||
+ data->client_sizeof_datum != sizeof(Datum))
+ {
+ data->allow_binary_protocol = false;
+ elog(LOG, "disabling binary protocol because of sizeof differences");
+ }
+ else if (data->client_bigendian != server_bigendian)
+ {
+ data->allow_binary_protocol = false;
+ elog(LOG, "disabling binary protocol because of endianess difference");
+ }
+
+ /*
+ * We also can't use the binary protocol if there are critical
+ * differences in compile time settings.
+ */
+ if (data->client_float4_byval != server_float4byval ||
+ data->client_float8_byval != server_float8byval)
+ data->allow_binary_protocol = false;
+
+ if (data->client_int_datetime != server_int_datetime)
+ data->int_datetime_mismatch = true;
+ else
+ data->int_datetime_mismatch = false;
+
+
+ /*
+ * Don't use the send/recv protocol if there are version
+ * differences. There currently isn't any guarantee for cross version
+ * compatibility of the send/recv representations. But there actually
+ * *is* a compat. guarantee for architecture differences...
+ *
+ * XXX: We could easily do better by doing per datatype considerations
+ * if there are known incompatibilities.
+ */
+ if (data->client_pg_version / 100 != PG_VERSION_NUM / 100)
+ data->allow_sendrecv_protocol = false;
+
+ /*
+ * Make sure it's safe to begin playing changes to the remote end.
+ * This'll ERROR out if we're not ready.
+ */
+
+ startup_callback(ctx, data, additional_options);
+ }
+}
+
+/*
+ * BEGIN callback
+ *
+ * If you change this you must also change the corresponding code in
+ * the apply code also. Make sure that any flags are in sync.
+ */
+void
+pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ LREPOutputData *data = ctx->output_plugin_private;
+ int flags = 0;
+ StringInfoData extradata;
+
+ AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
+
+ if (!should_forward_changeset(ctx, data, txn))
+ return;
+
+ /* get flags and extra fields to send */
+ if (LREPOutputCallbacks && LREPOutputCallbacks->begin_txn_get_extrafields)
+ {
+ initStringInfo(&extradata);
+ flags = LREPOutputCallbacks->begin_txn_get_extrafields(ctx, txn, &extradata);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ lrep_write_begin(ctx->out, txn, flags, &extradata);
+
+ OutputPluginWrite(ctx, true);
+ return;
+}
+
+/*
+ * COMMIT callback
+ *
+ * Send the LSN at the time of the commit, the commit time, and the end LSN.
+ *
+ * The presence of additional records is controlled by a flag field, with
+ * records that're present appearing strictly in the order they're listed
+ * here. There is no sub-record header or other structure beyond the flags
+ * field.
+ *
+ * If you change this, you'll need to change process_remote_commit(...)
+ * too. Make sure to keep any flags in sync.
+ */
+void
+pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LREPOutputData *data = ctx->output_plugin_private;
+ int flags = 0;
+ StringInfoData extradata;
+
+ if (!should_forward_changeset(ctx, data, txn))
+ return;
+
+ /* get flags and extra fields to send */
+ if (LREPOutputCallbacks && LREPOutputCallbacks->commit_txn_get_extrafields)
+ {
+ initStringInfo(&extradata);
+ flags = LREPOutputCallbacks->commit_txn_get_extrafields(ctx, txn, &extradata);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ lrep_write_commit(ctx->out, txn, commit_lsn, flags, &extradata);
+
+ OutputPluginWrite(ctx, true);
+}
+
+void
+pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change)
+{
+ LREPOutputData *data;
+ MemoryContext old;
+
+ data = ctx->output_plugin_private;
+
+ /* Avoid leaking memory by using and resetting our own context */
+ old = MemoryContextSwitchTo(data->context);
+
+ if (!should_forward_changeset(ctx, data, txn))
+ return;
+
+ if (!should_forward_change(ctx, data, relation, change->action))
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ switch (change->action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ lrep_write_insert(data, ctx->out, relation,
+ &change->data.tp.newtuple->tuple);
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ {
+ HeapTuple oldtuple = change->data.tp.oldtuple ?
+ &change->data.tp.oldtuple->tuple : NULL;
+
+ lrep_write_update(data, ctx->out, relation,
+ oldtuple, &change->data.tp.newtuple->tuple);
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_DELETE:
+ {
+ HeapTuple oldtuple = change->data.tp.oldtuple ?
+ &change->data.tp.oldtuple->tuple : NULL;
+
+ lrep_write_delete(data, ctx->out, relation, oldtuple);
+ break;
+ }
+ default:
+ Assert(false);
+ }
+ OutputPluginWrite(ctx, true);
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(data->context);
+}
+
+static void pg_decode_shutdown(LogicalDecodingContext * ctx)
+{
+ shutdown_callback(ctx);
+}
diff --git a/contrib/lrep/lrep_output.h b/contrib/lrep/lrep_output.h
new file mode 100644
index 0000000..bc19e12
--- /dev/null
+++ b/contrib/lrep/lrep_output.h
@@ -0,0 +1,49 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep_output.h
+ * LREP output plugin interfaces
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/lrep/lrep.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LREP_OUTPUT_H
+#define LREP_OUTPUT_H
+
+#include "lrep.h"
+
+typedef void (*lrep_decode_startup_callback_t) (LogicalDecodingContext *ctx,
+ LREPOutputData *data,
+ List *options);
+typedef void (*lrep_decode_shutdown_callback_t) (LogicalDecodingContext * ctx);
+typedef bool (*lrep_should_forward_changeset_t) (LogicalDecodingContext *ctx,
+ LREPOutputData *data,
+ ReorderBufferTXN *txn);
+typedef bool (*lrep_should_forward_change_t) (LogicalDecodingContext *ctx,
+ LREPOutputData *data,
+ Relation relation,
+ enum ReorderBufferChangeType action);
+typedef int (*lrep_begin_txn_get_extrafields_t) (LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ StringInfo extrafields);
+typedef int (*lrep_commit_txn_get_extrafields_t) (LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ StringInfo extrafields);
+
+typedef struct
+{
+ lrep_decode_startup_callback_t startup_callback;
+ lrep_decode_shutdown_callback_t shutdown_callback;
+ lrep_should_forward_changeset_t should_forward_changeset;
+ lrep_should_forward_change_t should_forward_change;
+ lrep_begin_txn_get_extrafields_t begin_txn_get_extrafields;
+ lrep_commit_txn_get_extrafields_t commit_txn_get_extrafields;
+} LREPOutputCallbackData;
+
+
+extern void LREP_output_plugin_init(OutputPluginCallbacks *pgcb, LREPOutputCallbackData *plugincb);
+
+#endif /* LREP_OUTPUT_H */
diff --git a/contrib/lrep/lrep_utils.c b/contrib/lrep/lrep_utils.c
new file mode 100644
index 0000000..eb56357
--- /dev/null
+++ b/contrib/lrep/lrep_utils.c
@@ -0,0 +1,725 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrep_utils.c
+ * LREP utility functions
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/lrep/lrep_util.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+
+#include "lrep.h"
+
+#include "access/sysattr.h"
+#include "access/tuptoaster.h"
+#include "access/xact.h"
+
+#include "catalog/catversion.h"
+#include "catalog/index.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+
+#include "commands/dbcommands.h"
+
+#include "executor/spi.h"
+
+#include "libpq/pqformat.h"
+#include "libpq-fe.h"
+
+#include "mb/pg_wchar.h"
+
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/typcache.h"
+
+static void
+decide_datum_transfer(LREPOutputData *data,
+ Form_pg_attribute att, Form_pg_type typclass,
+ bool *use_binary, bool *use_sendrecv);
+
+/*
+ * Wire protocol functions
+ */
+
+/*
+ * Write BEGIN to the output stream.
+ */
+void
+lrep_write_begin(StringInfo out, ReorderBufferTXN *txn, int flags,
+ StringInfo extradata)
+{
+ pq_sendbyte(out, 'B'); /* BEGIN */
+
+ /* send the flags field its self */
+ pq_sendint(out, flags, 4);
+
+ /* fixed fields */
+ pq_sendint64(out, txn->final_lsn);
+ pq_sendint64(out, txn->commit_time);
+ pq_sendint(out, txn->xid, 4);
+
+ /* if flags were set send the extradata too */
+ if (flags != 0 && extradata->len > 0)
+ pq_sendbytes(out, extradata->data, extradata->len);
+}
+
+/*
+ * Write COMMIT to the output stream.
+ */
+void
+lrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn,
+ int flags, StringInfo extradata)
+{
+ pq_sendbyte(out, 'C'); /* sending COMMIT */
+
+ /* send the flags field its self */
+ pq_sendint(out, flags, 4);
+
+ /* send fixed fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->commit_time);
+
+ /* if flags were set send the extradata too */
+ if (flags != 0 && extradata->len > 0)
+ pq_sendbytes(out, extradata->data, extradata->len);
+}
+
+/*
+ * Write INSERT to the output stream.
+ */
+void
+lrep_write_insert(LREPOutputData *data, StringInfo out, Relation rel,
+ HeapTuple newtuple)
+{
+ pq_sendbyte(out, 'I'); /* action INSERT */
+ lrep_write_rel(out, rel);
+ pq_sendbyte(out, 'N'); /* new tuple follows */
+ lrep_write_tuple(data, out, rel, newtuple);
+}
+
+/*
+ * Write UPDATE to the output stream.
+ */
+void
+lrep_write_update(LREPOutputData *data, StringInfo out, Relation rel,
+ HeapTuple oldtuple, HeapTuple newtuple)
+{
+ pq_sendbyte(out, 'U'); /* action UPDATE */
+ lrep_write_rel(out, rel);
+ if (oldtuple != NULL)
+ {
+ pq_sendbyte(out, 'K'); /* old key follows */
+ lrep_write_tuple(data, out, rel, oldtuple);
+ }
+ pq_sendbyte(out, 'N'); /* new tuple follows */
+ lrep_write_tuple(data, out, rel, newtuple);
+
+ pq_sendbyte(out, 'I'); /* action INSERT */
+ lrep_write_rel(out, rel);
+ pq_sendbyte(out, 'N'); /* new tuple follows */
+ lrep_write_tuple(data, out, rel, newtuple);
+}
+
+/*
+ * Write DELETE to the output stream.
+ */
+void
+lrep_write_delete(LREPOutputData *data, StringInfo out, Relation rel,
+ HeapTuple oldtuple)
+{
+ pq_sendbyte(out, 'D'); /* action DELETE */
+ lrep_write_rel(out, rel);
+ if (oldtuple != NULL)
+ {
+ pq_sendbyte(out, 'K'); /* old key follows */
+ lrep_write_tuple(data, out, rel, oldtuple);
+ }
+ else
+ pq_sendbyte(out, 'E'); /* empty */
+}
+
+/*
+ * Write schema.relation to the output stream.
+ */
+void
+lrep_write_rel(StringInfo out, Relation rel)
+{
+ const char *nspname;
+ int64 nspnamelen;
+ const char *relname;
+ int64 relnamelen;
+
+ nspname = get_namespace_name(rel->rd_rel->relnamespace);
+ if (nspname == NULL)
+ elog(ERROR, "cache lookup failed for namespace %u",
+ rel->rd_rel->relnamespace);
+ nspnamelen = strlen(nspname) + 1;
+
+ relname = NameStr(rel->rd_rel->relname);
+ relnamelen = strlen(relname) + 1;
+
+ pq_sendint(out, nspnamelen, 2); /* schema name length */
+ appendBinaryStringInfo(out, nspname, nspnamelen);
+
+ pq_sendint(out, relnamelen, 2); /* table name length */
+ appendBinaryStringInfo(out, relname, relnamelen);
+}
+
+/*
+ * Write a tuple to the outputstream, in the most efficient format possible.
+ */
+void
+lrep_write_tuple(LREPOutputData *data, StringInfo out, Relation rel,
+ HeapTuple tuple)
+{
+ TupleDesc desc;
+ Datum values[MaxTupleAttributeNumber];
+ bool isnull[MaxTupleAttributeNumber];
+ int i;
+
+ desc = RelationGetDescr(rel);
+
+ pq_sendbyte(out, 'T'); /* tuple follows */
+
+ pq_sendint(out, desc->natts, 4); /* number of attributes */
+
+ /* try to allocate enough memory from the get go */
+ enlargeStringInfo(out, tuple->t_len +
+ desc->natts * ( 1 + 4));
+
+ /*
+ * XXX: should this prove to be a relevant bottleneck, it might be
+ * interesting to inline heap_deform_tuple() here, we don't actually need
+ * the information in the form we get from it.
+ */
+ heap_deform_tuple(tuple, desc, values, isnull);
+
+ for (i = 0; i < desc->natts; i++)
+ {
+ HeapTuple typtup;
+ Form_pg_type typclass;
+
+ Form_pg_attribute att = desc->attrs[i];
+
+ bool use_binary = false;
+ bool use_sendrecv = false;
+
+ if (isnull[i] || att->attisdropped)
+ {
+ pq_sendbyte(out, 'n'); /* null column */
+ continue;
+ }
+ else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
+ {
+ pq_sendbyte(out, 'u'); /* unchanged toast column */
+ continue;
+ }
+
+ typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
+ if (!HeapTupleIsValid(typtup))
+ elog(ERROR, "cache lookup failed for type %u", att->atttypid);
+ typclass = (Form_pg_type) GETSTRUCT(typtup);
+
+ decide_datum_transfer(data, att, typclass, &use_binary, &use_sendrecv);
+
+ if (use_binary)
+ {
+ pq_sendbyte(out, 'b'); /* binary data follows */
+
+ /* pass by value */
+ if (att->attbyval)
+ {
+ pq_sendint(out, att->attlen, 4); /* length */
+
+ enlargeStringInfo(out, att->attlen);
+ store_att_byval(out->data + out->len, values[i], att->attlen);
+ out->len += att->attlen;
+ out->data[out->len] = '\0';
+ }
+ /* fixed length non-varlena pass-by-reference type */
+ else if (att->attlen > 0)
+ {
+ pq_sendint(out, att->attlen, 4); /* length */
+
+ appendBinaryStringInfo(out, DatumGetPointer(values[i]),
+ att->attlen);
+ }
+ /* varlena type */
+ else if (att->attlen == -1)
+ {
+ char *data = DatumGetPointer(values[i]);
+
+ /* send indirect datums inline */
+ if (VARATT_IS_EXTERNAL_INDIRECT(values[i]))
+ {
+ struct varatt_indirect redirect;
+ VARATT_EXTERNAL_GET_POINTER(redirect, data);
+ data = (char *) redirect.pointer;
+ }
+
+ Assert(!VARATT_IS_EXTERNAL(data));
+
+ pq_sendint(out, VARSIZE_ANY(data), 4); /* length */
+
+ appendBinaryStringInfo(out, data,
+ VARSIZE_ANY(data));
+
+ }
+ else
+ elog(ERROR, "unsupported tuple type");
+ }
+ else if (use_sendrecv)
+ {
+ bytea *outputbytes;
+ int len;
+
+ pq_sendbyte(out, 's'); /* 'send' data follows */
+
+ outputbytes =
+ OidSendFunctionCall(typclass->typsend, values[i]);
+
+ len = VARSIZE(outputbytes) - VARHDRSZ;
+ pq_sendint(out, len, 4); /* length */
+ pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
+ pfree(outputbytes);
+ }
+ else
+ {
+ char *outputstr;
+ int len;
+
+ pq_sendbyte(out, 't'); /* 'text' data follows */
+
+ outputstr =
+ OidOutputFunctionCall(typclass->typoutput, values[i]);
+ len = strlen(outputstr) + 1;
+ pq_sendint(out, len, 4); /* length */
+ appendBinaryStringInfo(out, outputstr, len); /* data */
+ pfree(outputstr);
+ }
+
+ ReleaseSysCache(typtup);
+ }
+}
+
+/*
+ * Read transaction BEGIN from the stream.
+ */
+int
+lrep_read_begin(StringInfo in, XLogRecPtr *origlsn, TimestampTz *committime,
+ TransactionId *remote_xid)
+{
+ int flags = pq_getmsgint(in, 4);
+
+ *origlsn = pq_getmsgint64(in);
+ Assert(*origlsn != InvalidXLogRecPtr);
+ *committime = pq_getmsgint64(in);
+ *remote_xid = pq_getmsgint(in, 4);
+
+ return flags;
+}
+
+/*
+ * Read transaction COMMIT from the stream.
+ */
+int
+lrep_read_commit(StringInfo in, XLogRecPtr *commit_lsn, XLogRecPtr *end_lsn,
+ TimestampTz *committime)
+{
+ int flags = pq_getmsgint(in, 4);
+
+ *commit_lsn = pq_getmsgint64(in);
+ *end_lsn = pq_getmsgint64(in);
+ *committime = pq_getmsgint64(in);
+
+ return flags;
+}
+
+/*
+ * Read INSERT from stream.
+ *
+ * Returns open relation locked in LOCKMODE and a HeapTuple
+ */
+Relation
+lrep_read_insert(StringInfo in, LOCKMODE lockmode, LREPTupleData *tuple)
+{
+ char action;
+ Oid reloid;
+ Relation rel;
+ TupleDesc desc;
+
+ reloid = RangeVarGetRelid(lrep_read_rel(in), lockmode, false);
+
+ action = pq_getmsgbyte(in);
+ if (action != 'N')
+ elog(ERROR, "expected new tuple but got %d",
+ action);
+
+ rel = heap_open(reloid, NoLock);
+ desc = RelationGetDescr(rel);
+
+ lrep_read_tuple_parts(in, desc, tuple);
+
+ return rel;
+}
+
+/*
+ * Read UPDATE from stream.
+ *
+ * Returns open relation locked in LOCKMODE and a old + new HeapTuples,
+ * old might be NULL.
+ */
+Relation
+lrep_read_update(StringInfo in, LOCKMODE lockmode, LREPTupleData *oldtuple,
+ LREPTupleData *newtuple, bool *pkey_sent)
+{
+ char action;
+ Oid reloid;
+ Relation rel;
+ TupleDesc desc;
+
+ reloid = RangeVarGetRelid(lrep_read_rel(in), lockmode, false);
+
+ action = pq_getmsgbyte(in);
+
+ if (action != 'K' && action != 'N')
+ elog(ERROR, "expected action 'N' or 'K', got %c",
+ action);
+
+ rel = heap_open(reloid, NoLock);
+ desc = RelationGetDescr(rel);
+
+ if (action == 'K')
+ {
+ lrep_read_tuple_parts(in, desc, oldtuple);
+ *pkey_sent = true;
+ action = pq_getmsgbyte(in);
+ }
+ else
+ *pkey_sent = false;
+
+ /* check for new tuple */
+ if (action != 'N')
+ elog(ERROR, "expected action 'N', got %c",
+ action);
+
+ lrep_read_tuple_parts(in, desc, newtuple);
+
+ return rel;
+}
+
+/*
+ * Read DELETE from stream.
+ *
+ * Returns open relation locked in LOCKMODE and a optionaly HeapTuple
+ */
+Relation
+lrep_read_delete(StringInfo in, LOCKMODE lockmode, LREPTupleData *tuple,
+ bool *pkey_sent)
+{
+ char action;
+ Oid reloid;
+ Relation rel;
+ TupleDesc desc;
+
+ reloid = RangeVarGetRelid(lrep_read_rel(in), lockmode, false);
+
+ action = pq_getmsgbyte(in);
+ if (action != 'K' && action != 'E')
+ elog(ERROR, "expected action K or E got %c", action);
+
+ rel = heap_open(reloid, NoLock);
+ desc = RelationGetDescr(rel);
+
+ if (action == 'E')
+ *pkey_sent = false;
+ else
+ {
+ lrep_read_tuple_parts(in, desc, tuple);
+ *pkey_sent = true;
+ }
+
+ return rel;
+}
+
+
+/*
+ * Read tuple from stream
+ */
+void
+lrep_read_tuple_parts(StringInfo s, TupleDesc desc, LREPTupleData *tuple)
+{
+ int i;
+ int rnatts;
+ char action;
+
+ action = pq_getmsgbyte(s);
+
+ if (action != 'T')
+ elog(ERROR, "expected TUPLE, got %c", action);
+
+ memset(tuple->nulls, true, sizeof(tuple->nulls));
+ memset(tuple->changed, true, sizeof(tuple->changed));
+
+ rnatts = pq_getmsgint(s, 4);
+
+ if (desc->natts != rnatts)
+ elog(ERROR, "tuple natts mismatch, %u vs %u", desc->natts, rnatts);
+
+ /* FIXME: unaligned data accesses */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = desc->attrs[i];
+ char kind = pq_getmsgbyte(s);
+ const char *data;
+ int len;
+
+ switch (kind)
+ {
+ case 'n': /* null */
+ /* already marked as null */
+ tuple->values[i] = 0xdeadbeef;
+ break;
+ case 'u': /* unchanged column */
+ tuple->nulls[i] = true;
+ tuple->changed[i] = false;
+ tuple->values[i] = 0xdeadbeef; /* make bad usage more obvious */
+
+ break;
+
+ case 'b': /* binary format */
+ tuple->nulls[i] = false;
+ len = pq_getmsgint(s, 4); /* read length */
+
+ data = pq_getmsgbytes(s, len);
+
+ /* and data */
+ if (att->attbyval)
+ tuple->values[i] = fetch_att(data, true, len);
+ else
+ tuple->values[i] = PointerGetDatum(data);
+ break;
+ case 's': /* send/recv format */
+ {
+ Oid typreceive;
+ Oid typioparam;
+ StringInfoData buf;
+
+ tuple->nulls[i] = false;
+ len = pq_getmsgint(s, 4); /* read length */
+
+ getTypeBinaryInputInfo(att->atttypid,
+ &typreceive, &typioparam);
+
+ /* create StringInfo pointing into the bigger buffer */
+ initStringInfo(&buf);
+ /* and data */
+ buf.data = (char *) pq_getmsgbytes(s, len);
+ buf.len = len;
+ tuple->values[i] = OidReceiveFunctionCall(
+ typreceive, &buf, typioparam, att->atttypmod);
+
+ if (buf.len != buf.cursor)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("incorrect binary data format")));
+ break;
+ }
+ case 't': /* text format */
+ {
+ Oid typinput;
+ Oid typioparam;
+
+ tuple->nulls[i] = false;
+ len = pq_getmsgint(s, 4); /* read length */
+
+ getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+ /* and data */
+ data = (char *) pq_getmsgbytes(s, len);
+ tuple->values[i] = OidInputFunctionCall(
+ typinput, (char *) data, typioparam, att->atttypmod);
+ }
+ break;
+ default:
+ elog(ERROR, "unknown column type '%c'", kind);
+ }
+
+ if (att->attisdropped && !tuple->nulls[i])
+ elog(ERROR, "data for dropped column");
+ }
+}
+
+/*
+ * Read schema.relation from stream and return as RangeVar.
+ */
+RangeVar *
+lrep_read_rel(StringInfo s)
+{
+ int relnamelen;
+ int nspnamelen;
+ RangeVar* rv;
+
+ rv = makeNode(RangeVar);
+
+ nspnamelen = pq_getmsgint(s, 2);
+ rv->schemaname = (char *) pq_getmsgbytes(s, nspnamelen);
+
+ relnamelen = pq_getmsgint(s, 2);
+ rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
+
+ return rv;
+}
+
+
+/*
+ * Make the executive decision about which protocol to use.
+ */
+static void
+decide_datum_transfer(LREPOutputData *data,
+ Form_pg_attribute att, Form_pg_type typclass,
+ bool *use_binary, bool *use_sendrecv)
+{
+ /* always disallow fancyness if there's type representation mismatches */
+ if (data->int_datetime_mismatch &&
+ (att->atttypid == TIMESTAMPOID || att->atttypid == TIMESTAMPTZOID ||
+ att->atttypid == TIMEOID))
+ {
+ *use_binary = false;
+ *use_sendrecv = false;
+ }
+ /*
+ * Use the binary protocol, if allowed, for builtin & plain datatypes.
+ */
+ else if (data->allow_binary_protocol &&
+ typclass->typtype == 'b' &&
+ att->atttypid < FirstNormalObjectId &&
+ typclass->typelem == InvalidOid)
+ {
+ *use_binary = true;
+ }
+ /*
+ * Use send/recv, if allowed, if the type is plain or builtin.
+ *
+ * XXX: we can't use send/recv for array or composite types for now due to
+ * the embedded oids.
+ */
+ else if (data->allow_sendrecv_protocol &&
+ OidIsValid(typclass->typreceive) &&
+ (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
+ (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
+ {
+ *use_sendrecv = true;
+ }
+}
+
+
+/*
+ * Option parsing helper functions
+ */
+
+/*
+ * Ensure parameter is non-null
+ */
+void
+lrep_opt_parse_notnull(DefElem *elem, const char *paramtype)
+{
+ if (elem->arg == NULL || strVal(elem->arg) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("%s parameter \"%s\" had no value",
+ paramtype, elem->defname)));
+}
+
+
+void
+lrep_opt_parse_uint32(DefElem *elem, uint32 *res)
+{
+ lrep_opt_parse_notnull(elem, "uint32");
+ errno = 0;
+ *res = strtoul(strVal(elem->arg), NULL, 0);
+
+ if (errno != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse uint32 value \"%s\" for parameter \"%s\": %m",
+ strVal(elem->arg), elem->defname)));
+}
+
+void
+lrep_opt_parse_size_t(DefElem *elem, size_t *res)
+{
+ lrep_opt_parse_notnull(elem, "size_t");
+ errno = 0;
+ *res = strtoull(strVal(elem->arg), NULL, 0);
+
+ if (errno != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse size_t value \"%s\" for parameter \"%s\": %m",
+ strVal(elem->arg), elem->defname)));
+}
+
+void
+lrep_opt_parse_bool(DefElem *elem, bool *res)
+{
+ lrep_opt_parse_notnull(elem, "bool");
+ if (!parse_bool(strVal(elem->arg), res))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse boolean value \"%s\" for parameter \"%s\": %m",
+ strVal(elem->arg), elem->defname)));
+}
+
+void
+lrep_opt_parse_identifier_list_arr(DefElem *elem, char ***list, int *len)
+{
+ List *namelist;
+ ListCell *c;
+
+ lrep_opt_parse_notnull(elem, "list");
+
+ if (!SplitIdentifierString(pstrdup(strVal(elem->arg)),
+ ',', &namelist))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not identifier list value \"%s\" for parameter \"%s\": %m",
+ strVal(elem->arg), elem->defname)));
+ }
+
+ *len = 0;
+ *list = palloc(list_length(namelist) * sizeof(char *));
+
+ foreach(c, namelist)
+ {
+ (*list)[(*len)++] = pstrdup(lfirst(c));
+ }
+ list_free(namelist);
+}
+
+/*
+ * Error reporting for required params.
+ */
+void
+lrep_opt_required_error(const char *param)
+{
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("missing value for for parameter \"%s\"",
+ param)));
+}
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers