From 4bfca35b149a303779bee49d96e3be25b914478f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tv@fuzzy.cz>
Date: Thu, 26 Sep 2019 17:04:54 +0200
Subject: [PATCH 1/2] Add logical_decoding_work_mem to limit ReorderBuffer
 memory usage

Instead of deciding to serialize a transaction merely based on the
number of changes in that xact (toplevel or subxact), this makes
the decisions based on amount of memory consumed by the changes.

The memory limit is defined by a new logical_decoding_work_mem GUC,
so for example we can do this

    SET logical_decoding_work_mem = '128kB'

to trigger very aggressive streaming. The minimum value is 64kB.

When adding a change to a transaction, we account for the size in
two places. Firstly, in the ReorderBuffer, which is then used to
decide if we reached the total memory limit. And secondly in the
transaction the change belongs to, so that we can pick the largest
transaction to evict (and serialize to disk).

We still use max_changes_in_memory when loading changes serialized
to disk. The trouble is we can't use the memory limit directly as
there might be multiple subxact serialized, we need to read all of
them but we don't know how many are there (and which subxact to
read first).

We do not serialize the ReorderBufferTXN entries, so if there is a
transaction with many subxacts, most memory may be in this type of
objects. Those records are not included in the memory accounting.

We also do not account for INTERNAL_TUPLECID changes, which are
kept in a separate list and not evicted from memory. Transactions
with many CTID changes may consume significant amounts of memory,
but we can't really do much about that.

The current eviction algorithm is very simple - the transaction is
picked merely by size, while it might be useful to also consider age
(LSN) of the changes for example. With the new Generational memory
allocator, evicting the oldest changes would make it more likely
the memory gets actually pfreed.

The logical_decoding_work_mem may be set either in postgresql.conf,
in which case it serves as the default for all publishers on that
instance, or when creating the subscription, using a work_mem
paramemter in the WITH clause (specifies number of kilobytes).
---
 doc/src/sgml/config.sgml                           |  21 ++
 doc/src/sgml/ref/create_subscription.sgml          |  12 +
 src/backend/catalog/pg_subscription.c              |   1 +
 src/backend/commands/subscriptioncmds.c            |  44 +++-
 .../libpqwalreceiver/libpqwalreceiver.c            |   3 +
 src/backend/replication/logical/reorderbuffer.c    | 292 ++++++++++++++++++++-
 src/backend/replication/logical/worker.c           |   1 +
 src/backend/replication/pgoutput/pgoutput.c        |  30 ++-
 src/backend/utils/misc/guc.c                       |  36 +++
 src/backend/utils/misc/postgresql.conf.sample      |   1 +
 src/include/catalog/pg_subscription.h              |   3 +
 src/include/replication/reorderbuffer.h            |  16 ++
 src/include/replication/walreceiver.h              |   1 +
 13 files changed, 441 insertions(+), 20 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 47b12c6..f1d13a0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1716,6 +1716,27 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-logical-decoding-work-mem" xreflabel="logical_decoding_work_mem">
+      <term><varname>logical_decoding_work_mem</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>logical_decoding_work_mem</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the maximum amount of memory to be used by logical decoding,
+        before some of the decoded changes are either written to local disk.
+        This limits the amount of memory used by logical streaming replication
+        connections. It defaults to 64 megabytes (<literal>64MB</literal>).
+        Since each replication connection only uses a single buffer of this size,
+        and an installation normally doesn't have many such connections
+        concurrently (as limited by <varname>max_wal_senders</varname>), it's
+        safe to set this value significantly higher than <varname>work_mem</varname>,
+        reducing the amount of decoded changes written to disk.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
       <term><varname>max_stack_depth</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 1a90c24..91790b0 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -206,6 +206,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>work_mem</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          Limits the amount of memory used to decode changes on the
+          publisher.  If not specified, the publisher will use the default
+          specified by <varname>logical_decoding_work_mem</varname>. When
+          needed, additional data are spilled to disk.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </para>
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index afee283..7e3ba8e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->workmem = subform->subworkmem;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 2e67a58..d85e831 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,7 +66,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh)
+						   bool *refresh, int *logical_wm,
+						   bool *logical_wm_given)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -97,6 +98,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*synchronous_commit = NULL;
 	if (refresh)
 		*refresh = true;
+	if (logical_wm)
+		*logical_wm_given = false;
 
 	/* Parse options */
 	foreach(lc, options)
@@ -182,6 +185,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "work_mem") == 0 && logical_wm)
+		{
+			if (*logical_wm_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			*logical_wm_given = true;
+			*logical_wm = defGetInt32(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -325,6 +338,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		enabled_given;
 	bool		enabled;
 	bool		copy_data;
+	int			logical_wm;
+	bool		logical_wm_given;
 	char	   *synchronous_commit;
 	char	   *conninfo;
 	char	   *slotname;
@@ -341,7 +356,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
 							   &enabled, &create_slot, &slotname_given,
 							   &slotname, &copy_data, &synchronous_commit,
-							   NULL);
+							   NULL, &logical_wm, &logical_wm_given);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -419,6 +434,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
+	if (logical_wm_given)
+		values[Anum_pg_subscription_subworkmem - 1] =
+			Int32GetDatum(logical_wm);
+	else
+		nulls[Anum_pg_subscription_subworkmem - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -682,10 +703,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
+				int			logical_wm;
+				bool		logical_wm_given;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL);
+										   NULL, &synchronous_commit, NULL,
+										   &logical_wm, &logical_wm_given);
 
 				if (slotname_given)
 				{
@@ -710,6 +734,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
+				if (logical_wm_given)
+				{
+					values[Anum_pg_subscription_subworkmem - 1] =
+						Int32GetDatum(logical_wm);
+					replaces[Anum_pg_subscription_subworkmem - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -721,7 +752,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL, NULL,
+										   NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -759,7 +791,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh);
+										   NULL, &refresh, NULL, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -796,7 +828,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6eba08a..65b3266 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -406,6 +406,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, "proto_version '%u'",
 						 options->proto.logical.proto_version);
 
+		appendStringInfo(&cmd, ", work_mem '%d'",
+						 options->proto.logical.work_mem);
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8ce28ad..6228140 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -49,6 +49,34 @@
  *	  GenerationContext for the variable-length transaction data (allocated
  *	  and freed in groups with similar lifespan).
  *
+ *	  To limit the amount of memory used by decoded changes, we track memory
+ *	  used at the reorder buffer level (i.e. total amount of memory), and for
+ *	  each toplevel transaction. When the total amount of used memory exceeds
+ *	  the limit, the toplevel transaction consuming the most memory is then
+ *	  serialized to disk.
+ *
+ *	  Only decoded changes are evicted from memory (spilled to disk), not the
+ *	  transaction records. The number of toplevel transactions is limited,
+ *	  but a transaction with many subtransactions may still consume significant
+ *	  amounts of memory. The transaction records are fairly small, though, and
+ *	  are not included in the memory limit.
+ *
+ *	  The current eviction algorithm is very simple - the transaction is
+ *	  picked merely by size, while it might be useful to also consider age
+ *	  (LSN) of the changes for example. With the new Generational memory
+ *	  allocator, evicting the oldest changes would make it more likely the
+ *	  memory gets actually freed.
+ *
+ *	  We still rely on max_changes_in_memory when loading serialized changes
+ *	  back into memory. At that point we can't use the memory limit directly
+ *	  as we load the subxacts independently. One option do deal with this
+ *	  would be to count the subxacts, and allow each to allocate 1/N of the
+ *	  memory limit. That however does not seem very appealing, because with
+ *	  many subtransactions it may easily cause trashing (short cycles of
+ *	  deserializing and applying very few changes). We probably should give
+ *	  a bit more memory to the oldest subtransactions, because it's likely
+ *	  the source for the next sequence of changes.
+ *
  * -------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -154,7 +182,8 @@ typedef struct ReorderBufferDiskChange
  * resource management here, but it's not entirely clear what that would look
  * like.
  */
-static const Size max_changes_in_memory = 4096;
+int			logical_decoding_work_mem;
+static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 
 /* ---------------------------------------
  * primary reorderbuffer support routines
@@ -189,7 +218,7 @@ static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTX
  * Disk serialization support functions
  * ---------------------------------------
  */
-static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb);
 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 int fd, ReorderBufferChange *change);
@@ -217,6 +246,14 @@ static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										  Relation relation, ReorderBufferChange *change);
 
+/*
+ * ---------------------------------------
+ * memory accounting
+ * ---------------------------------------
+ */
+static Size ReorderBufferChangeSize(ReorderBufferChange *change);
+static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
+								ReorderBufferChange *change, bool addition);
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -269,6 +306,7 @@ ReorderBufferAllocate(void)
 
 	buffer->outbuf = NULL;
 	buffer->outbufsize = 0;
+	buffer->size = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -374,6 +412,9 @@ ReorderBufferGetChange(ReorderBuffer *rb)
 void
 ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 {
+	/* update memory accounting info */
+	ReorderBufferChangeMemoryUpdate(rb, change, false);
+
 	/* free contained data */
 	switch (change->action)
 	{
@@ -585,12 +626,18 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 	change->lsn = lsn;
+	change->txn = txn;
+
 	Assert(InvalidXLogRecPtr != lsn);
 	dlist_push_tail(&txn->changes, &change->node);
 	txn->nentries++;
 	txn->nentries_mem++;
 
-	ReorderBufferCheckSerializeTXN(rb, txn);
+	/* update memory accounting information */
+	ReorderBufferChangeMemoryUpdate(rb, change, true);
+
+	/* check the memory limits and evict something if needed */
+	ReorderBufferCheckMemoryLimit(rb);
 }
 
 /*
@@ -1217,6 +1264,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		change = dlist_container(ReorderBufferChange, node, iter.cur);
 
+		/* Check we're not mixing changes from different transactions. */
+		Assert(change->txn == txn);
+
 		ReorderBufferReturnChange(rb, change);
 	}
 
@@ -1229,7 +1279,11 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		ReorderBufferChange *change;
 
 		change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+		/* Check we're not mixing changes from different transactions. */
+		Assert(change->txn == txn);
 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
 		ReorderBufferReturnChange(rb, change);
 	}
 
@@ -2082,9 +2136,48 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
 	ReorderBufferQueueChange(rb, xid, lsn, change);
 }
 
+/*
+ * Update the memory accounting info. We track memory used by the whole
+ * reorder buffer and the transaction containing the change.
+ */
+static void
+ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
+								ReorderBufferChange *change,
+								bool addition)
+{
+	Size		sz;
+
+	Assert(change->txn);
+
+	/*
+	 * Ignore tuple CID changes, because those are not evicted when
+	 * reaching memory limit. So we just don't count them, because it
+	 * might easily trigger a pointless attempt to spill/stream.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+		return;
+
+	sz = ReorderBufferChangeSize(change);
+
+	if (addition)
+	{
+		change->txn->size += sz;
+		rb->size += sz;
+	}
+	else
+	{
+		Assert((rb->size >= sz) && (change->txn->size >= sz));
+		change->txn->size -= sz;
+		rb->size -= sz;
+	}
+}
 
 /*
  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
+ *
+ * We do not include this change type in memory accounting, because we
+ * keep CIDs in a separate list and do not evict them when reaching
+ * the memory limit.
  */
 void
 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
@@ -2230,20 +2323,84 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 }
 
 /*
- * Check whether the transaction tx should spill its data to disk.
+ * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
+ *
+ * XXX With many subtransactions this might be quite slow, because we'll have
+ * to walk through all of them. There are some options how we could improve
+ * that: (a) maintain some secondary structure with transactions sorted by
+ * amount of changes, (b) not looking for the entirely largest transaction,
+ * but e.g. for transaction using at least some fraction of the memory limit,
+ * and (c) evicting multiple transactions at once, e.g. to free a given portion
+ * of the memory limit (e.g. 50%).
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTXN(ReorderBuffer *rb)
+{
+	HASH_SEQ_STATUS hash_seq;
+	ReorderBufferTXNByIdEnt	*ent;
+	ReorderBufferTXN *largest = NULL;
+
+	hash_seq_init(&hash_seq, rb->by_txn);
+	while ((ent = hash_seq_search(&hash_seq)) != NULL)
+	{
+		ReorderBufferTXN *txn = ent->txn;
+
+		/* if the current transaction is larger, remember it */
+		if ((!largest) || (txn->size > largest->size))
+			largest = txn;
+	}
+
+	Assert(largest);
+	Assert(largest->size > 0);
+	Assert(largest->size <= rb->size);
+
+	return largest;
+}
+
+/*
+ * Check whether the logical_decoding_work_mem limit was reached, and if yes
+ * pick the transaction to evict and spill the changes to disk.
+ *
+ * XXX At this point we select just a single (largest) transaction, but
+ * we might also adapt a more elaborate eviction strategy - for example
+ * evicting enough transactions to free certain fraction (e.g. 50%) of
+ * the memory limit.
  */
 static void
-ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 {
+	ReorderBufferTXN *txn;
+
+	/* bail out if we haven't exceeded the memory limit */
+	if (rb->size < logical_decoding_work_mem * 1024L)
+		return;
+
 	/*
-	 * TODO: improve accounting so we cheaply can take subtransactions into
-	 * account here.
+	 * Pick the largest transaction (or subtransaction) and evict it from
+	 * memory by serializing it to disk.
 	 */
-	if (txn->nentries_mem >= max_changes_in_memory)
-	{
-		ReorderBufferSerializeTXN(rb, txn);
-		Assert(txn->nentries_mem == 0);
-	}
+	txn = ReorderBufferLargestTXN(rb);
+
+	ReorderBufferSerializeTXN(rb, txn);
+
+	/*
+	 * After eviction, the transaction should have no entries in memory, and
+	 * should use 0 bytes for changes.
+	 */
+	Assert(txn->size == 0);
+	Assert(txn->nentries_mem == 0);
+
+	/*
+	 * And furthermore, evicting the transaction should get us below the
+	 * memory limit again - it is not possible that we're still exceeding the
+	 * memory limit after evicting the transaction.
+	 *
+	 * This follows from the simple fact that the selected transaction is at
+	 * least as large as the most recent change (which caused us to go over
+	 * the memory limit). So by evicting it we're definitely back below the
+	 * memory limit.
+	 */
+	Assert(rb->size < logical_decoding_work_mem * 1024L);
 }
 
 /*
@@ -2513,6 +2670,84 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 }
 
 /*
+ * Size of a change in memory.
+ */
+static Size
+ReorderBufferChangeSize(ReorderBufferChange *change)
+{
+	Size		sz = sizeof(ReorderBufferChange);
+
+	switch (change->action)
+	{
+			/* fall through these, they're all similar enough */
+		case REORDER_BUFFER_CHANGE_INSERT:
+		case REORDER_BUFFER_CHANGE_UPDATE:
+		case REORDER_BUFFER_CHANGE_DELETE:
+		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
+			{
+				ReorderBufferTupleBuf *oldtup,
+						   *newtup;
+				Size		oldlen = 0;
+				Size		newlen = 0;
+
+				oldtup = change->data.tp.oldtuple;
+				newtup = change->data.tp.newtuple;
+
+				if (oldtup)
+				{
+					sz += sizeof(HeapTupleData);
+					oldlen = oldtup->tuple.t_len;
+					sz += oldlen;
+				}
+
+				if (newtup)
+				{
+					sz += sizeof(HeapTupleData);
+					newlen = newtup->tuple.t_len;
+					sz += newlen;
+				}
+
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			{
+				Size		prefix_size = strlen(change->data.msg.prefix) + 1;
+
+				sz += prefix_size + change->data.msg.message_size +
+					sizeof(Size) + sizeof(Size);
+
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
+			{
+				Snapshot	snap;
+
+				snap = change->data.snapshot;
+
+				sz += sizeof(SnapshotData) +
+					sizeof(TransactionId) * snap->xcnt +
+					sizeof(TransactionId) * snap->subxcnt;
+
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_TRUNCATE:
+			{
+				sz += sizeof(Oid) * change->data.truncate.nrelids;
+
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
+		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+			/* ReorderBufferChange contains everything important */
+			break;
+	}
+
+	return sz;
+}
+
+
+/*
  * Restore a number of changes spilled to disk back into memory.
  */
 static Size
@@ -2784,6 +3019,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	dlist_push_tail(&txn->changes, &change->node);
 	txn->nentries_mem++;
+
+	/*
+	 * Update memory accounting for the restored change.  We need to do this
+	 * although we don't check the memory limit when restoring the changes in
+	 * this branch (we only do that when initially queueing the changes after
+	 * decoding), because we will release the changes later, and that will
+	 * update the accounting too (subtracting the size from the counters).
+	 * And we don't want to underflow there.
+	 */
+	ReorderBufferChangeMemoryUpdate(rb, change, true);
 }
 
 /*
@@ -3003,6 +3248,19 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
  *
  * We cannot replace unchanged toast tuples though, so those will still point
  * to on-disk toast data.
+ *
+ * While updating the existing change with detoasted tuple data, we need to
+ * update the memory accounting info, because the change size will differ.
+ * Otherwise the accounting may get out of sync, triggering serialization
+ * at unexpected times.
+ *
+ * We simply subtract size of the change before rejiggering the tuple, and
+ * then adding the new size. This makes it look like the change was removed
+ * and then added back, except it only tweaks the accounting info.
+ *
+ * In particular it can't trigger serialization, which would be pointless
+ * anyway as it happens during commit processing right before handing
+ * the change to the output plugin.
  */
 static void
 ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
@@ -3023,6 +3281,13 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	if (txn->toast_hash == NULL)
 		return;
 
+	/*
+	 * We're going modify the size of the change, so to make sure the
+	 * accounting is correct we'll make it look like we're removing the
+	 * change now (with the old size), and then re-add it at the end.
+	 */
+	ReorderBufferChangeMemoryUpdate(rb, change, false);
+
 	oldcontext = MemoryContextSwitchTo(rb->context);
 
 	/* we should only have toast tuples in an INSERT or UPDATE */
@@ -3172,6 +3437,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	pfree(isnull);
 
 	MemoryContextSwitchTo(oldcontext);
+
+	/* now add the change back, with the correct size */
+	ReorderBufferChangeMemoryUpdate(rb, change, true);
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 11e6331..f737afb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1725,6 +1725,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.slotname = myslotname;
 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
+	options.proto.logical.work_mem = MySubscription->workmem;
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757..317c5d4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -21,6 +21,7 @@
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 
+#include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
 #include "utils/memutils.h"
@@ -90,11 +91,12 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names)
+						List **publication_names, int *logical_decoding_work_mem)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
+	bool		work_mem_given = false;
 
 	foreach(lc, options)
 	{
@@ -140,6 +142,29 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
 		}
+		else if (strcmp(defel->defname, "work_mem") == 0)
+		{
+			int64	parsed;
+
+			if (work_mem_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			work_mem_given = true;
+
+			if (!scanint8(strVal(defel->arg), true, &parsed))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid work_mem")));
+
+			if (parsed > PG_INT32_MAX || parsed < 64)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("work_mem \"%s\" out of range",
+								strVal(defel->arg))));
+
+			*logical_decoding_work_mem = (int)parsed;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -174,7 +199,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		/* Parse the params and ERROR if we see any we don't recognize */
 		parse_output_parameters(ctx->output_plugin_options,
 								&data->protocol_version,
-								&data->publication_names);
+								&data->publication_names,
+								&logical_decoding_work_mem);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2178e1c..5d7e687 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -65,6 +65,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
+#include "replication/logical.h"
 #include "replication/logicallauncher.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
@@ -191,6 +192,7 @@ static bool check_maxconnections(int *newval, void **extra, GucSource source);
 static bool check_max_worker_processes(int *newval, void **extra, GucSource source);
 static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
 static bool check_max_wal_senders(int *newval, void **extra, GucSource source);
+static bool check_logical_decoding_work_mem(int *newval, void **extra, GucSource source);
 static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source);
 static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source);
 static void assign_effective_io_concurrency(int newval, void *extra);
@@ -2251,6 +2253,18 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"logical_decoding_work_mem", PGC_USERSET, RESOURCES_MEM,
+			gettext_noop("Sets the maximum memory to be used for logical decoding."),
+			gettext_noop("This much memory can be used by each internal "
+						 "reorder buffer before spilling to disk or streaming."),
+			GUC_UNIT_KB
+		},
+		&logical_decoding_work_mem,
+		-1, -1, MAX_KILOBYTES,
+		check_logical_decoding_work_mem, NULL, NULL
+	},
+
 	/*
 	 * We use the hopefully-safely-small value of 100kB as the compiled-in
 	 * default for max_stack_depth.  InitializeGUCOptions will increase it if
@@ -11286,6 +11300,28 @@ check_max_wal_senders(int *newval, void **extra, GucSource source)
 }
 
 static bool
+check_logical_decoding_work_mem(int *newval, void **extra, GucSource source)
+{
+	/*
+	 * -1 indicates fallback.
+	 *
+	 * If we haven't yet changed the boot_val default of -1, just let it be.
+	 * logical decoding will look to maintenance_work_mem instead.
+	 */
+	if (*newval == -1)
+		return true;
+
+	/*
+	 * We clamp manually-set values to at least 64kB. The maintenance_work_mem
+	 * uses a higher minimum value (1MB), so this is OK.
+	 */
+	if (*newval < 64)
+		*newval = 64;
+
+	return true;
+}
+
+static bool
 check_autovacuum_work_mem(int *newval, void **extra, GucSource source)
 {
 	/*
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 0fc23e3..00a22b8 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -130,6 +130,7 @@
 #work_mem = 4MB				# min 64kB
 #maintenance_work_mem = 64MB		# min 1MB
 #autovacuum_work_mem = -1		# min 1MB, or -1 to use maintenance_work_mem
+#logical_decoding_work_mem = 64MB	# min 1MB, or -1 to use maintenance_work_mem
 #max_stack_depth = 2MB			# min 100kB
 #shared_memory_type = mmap		# the default is the first option
 					# supported by the operating system:
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 3cb13d8..10ea113 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -48,6 +48,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	int32		subworkmem;		/* Memory to use to decode changes. */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +75,7 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	int			workmem;		/* Memory to decode changes. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4c06a78..4dcef80 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -17,6 +17,8 @@
 #include "utils/snapshot.h"
 #include "utils/timestamp.h"
 
+extern PGDLLIMPORT	int	logical_decoding_work_mem;
+
 /* an individual tuple, stored in one chunk of memory */
 typedef struct ReorderBufferTupleBuf
 {
@@ -63,6 +65,9 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_TRUNCATE
 };
 
+/* forward declaration */
+struct ReorderBufferTXN;
+
 /*
  * a single 'change', can be an insert (with one tuple), an update (old, new),
  * or a delete (old).
@@ -77,6 +82,9 @@ typedef struct ReorderBufferChange
 	/* The type of change. */
 	enum ReorderBufferChangeType action;
 
+	/* Transaction this change belongs to. */
+	struct ReorderBufferTXN *txn;
+
 	RepOriginId origin_id;
 
 	/*
@@ -286,6 +294,11 @@ typedef struct ReorderBufferTXN
 	 */
 	dlist_node	node;
 
+	/*
+	 * Size of this transaction (changes currently in memory, in bytes).
+	 */
+	Size		size;
+
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
@@ -386,6 +399,9 @@ struct ReorderBuffer
 	/* buffer for disk<->memory conversions */
 	char	   *outbuf;
 	Size		outbufsize;
+
+	/* memory accounting */
+	Size		size;
 };
 
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e12a934..4e68a69 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -162,6 +162,7 @@ typedef struct
 		{
 			uint32		proto_version;	/* Logical protocol version */
 			List	   *publication_names;	/* String list of publications */
+			int			work_mem;	/* Memory limit to use for decoding */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
-- 
1.8.3.1

