On 12-11-14 08:17 PM, Andres Freund wrote:

I am getting errors like the following when I try to use either your test_decoding plugin or my own (which does even less than yours)


LOG:  database system is ready to accept connections
LOG:  autovacuum launcher started
WARNING:  connecting to
WARNING:  Initiating logical rep
LOG:  computed new xmin: 773
LOG:  start reading from 0/17F5D58, scrolled back to 0/17F4000
LOG:  got new xmin 773 at 25124280
LOG:  found initial snapshot (via running xacts). Done: 1
WARNING:  reached consistent point, stopping!
WARNING:  Starting logical replication
LOG:  start reading from 0/17F5D58, scrolled back to 0/17F4000
LOG:  found initial snapshot (via running xacts). Done: 1
FATAL:  cannot read pg_class without having selected a database
TRAP: FailedAssertion("!(SHMQueueEmpty(&(MyProc->myProcLocks[i])))", File: "proc.c", Line: 759)

This seems to be happening under the calls at
reorderbuffer.c:832 if (!SnapBuildHasCatalogChanges(NULL, xid, &change->relnode))

The sequence of events I do is:
1. start pg_receivellog
2. run a checkpoint
3. Attach to the walsender process with gdb
4. Start a new client connection with psql and do 'INSERT INTO a values (1)' twice.

(skipping step 3 doesn't make a difference)




I


This introduces several things:
* 'reorderbuffer' module which reassembles transactions from a stream of 
interspersed changes
* 'snapbuilder' which builds catalog snapshots so that tuples from wal can be 
understood
* logging more data into wal to facilitate logical decoding
* wal decoding into an reorderbuffer
* shared library output plugins with 5 callbacks
  * init
  * begin
  * change
  * commit
* walsender infrastructur to stream out changes and to keep the global xmin low 
enough
  * INIT_LOGICAL_REPLICATION $plugin; waits till a consistent snapshot is built 
and returns
    * initial LSN
    * replication slot identifier
    * id of a pg_export() style snapshot
  * START_LOGICAL_REPLICATION $id $lsn; streams out changes
  * uses named output plugins for output specification

Todo:
* testing infrastructure (isolationtester)
* persistence/spilling to disk of built snapshots, longrunning
   transactions
* user docs
* more frequent lowering of xmins
* more docs about the internals
* support for user declared catalog tables
* actual exporting of initial pg_export snapshots after
   INIT_LOGICAL_REPLICATION
* own shared memory segment instead of piggybacking on walsender's
* nicer interface between snapbuild.c, reorderbuffer.c, decode.c and the
   outside.
* more frequent xl_running_xid's so xmin can be upped more frequently
* add STOP_LOGICAL_REPLICATION $id
---
  src/backend/access/heap/heapam.c                |  280 +++++-
  src/backend/access/transam/xlog.c               |    1 +
  src/backend/catalog/index.c                     |   74 ++
  src/backend/replication/Makefile                |    2 +
  src/backend/replication/logical/Makefile        |   19 +
  src/backend/replication/logical/decode.c        |  496 ++++++++++
  src/backend/replication/logical/logicalfuncs.c  |  247 +++++
  src/backend/replication/logical/reorderbuffer.c | 1156 +++++++++++++++++++++++
  src/backend/replication/logical/snapbuild.c     | 1144 ++++++++++++++++++++++
  src/backend/replication/repl_gram.y             |   32 +-
  src/backend/replication/repl_scanner.l          |    2 +
  src/backend/replication/walsender.c             |  566 ++++++++++-
  src/backend/storage/ipc/procarray.c             |   23 +
  src/backend/storage/ipc/standby.c               |    8 +-
  src/backend/utils/cache/inval.c                 |    2 +-
  src/backend/utils/cache/relcache.c              |    3 +-
  src/backend/utils/misc/guc.c                    |   11 +
  src/backend/utils/time/tqual.c                  |  249 +++++
  src/bin/pg_controldata/pg_controldata.c         |    2 +
  src/include/access/heapam_xlog.h                |   23 +
  src/include/access/transam.h                    |    5 +
  src/include/access/xlog.h                       |    3 +-
  src/include/catalog/index.h                     |    4 +
  src/include/nodes/nodes.h                       |    2 +
  src/include/nodes/replnodes.h                   |   22 +
  src/include/replication/decode.h                |   21 +
  src/include/replication/logicalfuncs.h          |   44 +
  src/include/replication/output_plugin.h         |   76 ++
  src/include/replication/reorderbuffer.h         |  284 ++++++
  src/include/replication/snapbuild.h             |  128 +++
  src/include/replication/walsender.h             |    1 +
  src/include/replication/walsender_private.h     |   34 +-
  src/include/storage/itemptr.h                   |    3 +
  src/include/storage/sinval.h                    |    2 +
  src/include/utils/tqual.h                       |   31 +-
  35 files changed, 4966 insertions(+), 34 deletions(-)
  create mode 100644 src/backend/replication/logical/Makefile
  create mode 100644 src/backend/replication/logical/decode.c
  create mode 100644 src/backend/replication/logical/logicalfuncs.c
  create mode 100644 src/backend/replication/logical/reorderbuffer.c
  create mode 100644 src/backend/replication/logical/snapbuild.c
  create mode 100644 src/include/replication/decode.h
  create mode 100644 src/include/replication/logicalfuncs.h
  create mode 100644 src/include/replication/output_plugin.h
  create mode 100644 src/include/replication/reorderbuffer.h
  create mode 100644 src/include/replication/snapbuild.h




Reply via email to