On 12-11-14 08:17 PM, Andres Freund wrote:
I am getting errors like the following when I try to use either your
test_decoding plugin or my own (which does even less than yours)
LOG: database system is ready to accept connections
LOG: autovacuum launcher started
WARNING: connecting to
WARNING: Initiating logical rep
LOG: computed new xmin: 773
LOG: start reading from 0/17F5D58, scrolled back to 0/17F4000
LOG: got new xmin 773 at 25124280
LOG: found initial snapshot (via running xacts). Done: 1
WARNING: reached consistent point, stopping!
WARNING: Starting logical replication
LOG: start reading from 0/17F5D58, scrolled back to 0/17F4000
LOG: found initial snapshot (via running xacts). Done: 1
FATAL: cannot read pg_class without having selected a database
TRAP: FailedAssertion("!(SHMQueueEmpty(&(MyProc->myProcLocks[i])))",
File: "proc.c", Line: 759)
This seems to be happening under the calls at
reorderbuffer.c:832 if (!SnapBuildHasCatalogChanges(NULL, xid,
&change->relnode))
The sequence of events I do is:
1. start pg_receivellog
2. run a checkpoint
3. Attach to the walsender process with gdb
4. Start a new client connection with psql and do 'INSERT INTO a values
(1)' twice.
(skipping step 3 doesn't make a difference)
I
This introduces several things:
* 'reorderbuffer' module which reassembles transactions from a stream of
interspersed changes
* 'snapbuilder' which builds catalog snapshots so that tuples from wal can be
understood
* logging more data into wal to facilitate logical decoding
* wal decoding into an reorderbuffer
* shared library output plugins with 5 callbacks
* init
* begin
* change
* commit
* walsender infrastructur to stream out changes and to keep the global xmin low
enough
* INIT_LOGICAL_REPLICATION $plugin; waits till a consistent snapshot is built
and returns
* initial LSN
* replication slot identifier
* id of a pg_export() style snapshot
* START_LOGICAL_REPLICATION $id $lsn; streams out changes
* uses named output plugins for output specification
Todo:
* testing infrastructure (isolationtester)
* persistence/spilling to disk of built snapshots, longrunning
transactions
* user docs
* more frequent lowering of xmins
* more docs about the internals
* support for user declared catalog tables
* actual exporting of initial pg_export snapshots after
INIT_LOGICAL_REPLICATION
* own shared memory segment instead of piggybacking on walsender's
* nicer interface between snapbuild.c, reorderbuffer.c, decode.c and the
outside.
* more frequent xl_running_xid's so xmin can be upped more frequently
* add STOP_LOGICAL_REPLICATION $id
---
src/backend/access/heap/heapam.c | 280 +++++-
src/backend/access/transam/xlog.c | 1 +
src/backend/catalog/index.c | 74 ++
src/backend/replication/Makefile | 2 +
src/backend/replication/logical/Makefile | 19 +
src/backend/replication/logical/decode.c | 496 ++++++++++
src/backend/replication/logical/logicalfuncs.c | 247 +++++
src/backend/replication/logical/reorderbuffer.c | 1156 +++++++++++++++++++++++
src/backend/replication/logical/snapbuild.c | 1144 ++++++++++++++++++++++
src/backend/replication/repl_gram.y | 32 +-
src/backend/replication/repl_scanner.l | 2 +
src/backend/replication/walsender.c | 566 ++++++++++-
src/backend/storage/ipc/procarray.c | 23 +
src/backend/storage/ipc/standby.c | 8 +-
src/backend/utils/cache/inval.c | 2 +-
src/backend/utils/cache/relcache.c | 3 +-
src/backend/utils/misc/guc.c | 11 +
src/backend/utils/time/tqual.c | 249 +++++
src/bin/pg_controldata/pg_controldata.c | 2 +
src/include/access/heapam_xlog.h | 23 +
src/include/access/transam.h | 5 +
src/include/access/xlog.h | 3 +-
src/include/catalog/index.h | 4 +
src/include/nodes/nodes.h | 2 +
src/include/nodes/replnodes.h | 22 +
src/include/replication/decode.h | 21 +
src/include/replication/logicalfuncs.h | 44 +
src/include/replication/output_plugin.h | 76 ++
src/include/replication/reorderbuffer.h | 284 ++++++
src/include/replication/snapbuild.h | 128 +++
src/include/replication/walsender.h | 1 +
src/include/replication/walsender_private.h | 34 +-
src/include/storage/itemptr.h | 3 +
src/include/storage/sinval.h | 2 +
src/include/utils/tqual.h | 31 +-
35 files changed, 4966 insertions(+), 34 deletions(-)
create mode 100644 src/backend/replication/logical/Makefile
create mode 100644 src/backend/replication/logical/decode.c
create mode 100644 src/backend/replication/logical/logicalfuncs.c
create mode 100644 src/backend/replication/logical/reorderbuffer.c
create mode 100644 src/backend/replication/logical/snapbuild.c
create mode 100644 src/include/replication/decode.h
create mode 100644 src/include/replication/logicalfuncs.h
create mode 100644 src/include/replication/output_plugin.h
create mode 100644 src/include/replication/reorderbuffer.h
create mode 100644 src/include/replication/snapbuild.h