From bab5dbc8915d074604614fcf3b8486abb04d3a2d Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Mon, 21 Oct 2019 17:45:47 +0530
Subject: [PATCH 3/3] Support logical_decoding_work_mem set from create
 subscription command

---
 doc/src/sgml/config.sgml                           | 21 +++++++++++
 doc/src/sgml/ref/create_subscription.sgml          | 12 ++++++
 src/backend/catalog/pg_subscription.c              |  1 +
 src/backend/commands/subscriptioncmds.c            | 44 +++++++++++++++++++---
 .../libpqwalreceiver/libpqwalreceiver.c            |  3 ++
 src/backend/replication/logical/worker.c           |  1 +
 src/backend/replication/pgoutput/pgoutput.c        | 30 ++++++++++++++-
 src/include/catalog/pg_subscription.h              |  3 ++
 src/include/replication/walreceiver.h              |  1 +
 9 files changed, 108 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 291b343..6b700c6 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1737,6 +1737,27 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-logical-decoding-work-mem" xreflabel="logical_decoding_work_mem">
+      <term><varname>logical_decoding_work_mem</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>logical_decoding_work_mem</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the maximum amount of memory to be used by logical decoding,
+        before some of the decoded changes are either written to local disk.
+        This limits the amount of memory used by logical streaming replication
+        connections. It defaults to 64 megabytes (<literal>64MB</literal>).
+        Since each replication connection only uses a single buffer of this size,
+        and an installation normally doesn't have many such connections
+        concurrently (as limited by <varname>max_wal_senders</varname>), it's
+        safe to set this value significantly higher than <varname>work_mem</varname>,
+        reducing the amount of decoded changes written to disk.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
       <term><varname>max_stack_depth</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 1a90c24..91790b0 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -206,6 +206,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>work_mem</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          Limits the amount of memory used to decode changes on the
+          publisher.  If not specified, the publisher will use the default
+          specified by <varname>logical_decoding_work_mem</varname>. When
+          needed, additional data are spilled to disk.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </para>
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index afee283..7e3ba8e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->workmem = subform->subworkmem;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 2e67a58..d85e831 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,7 +66,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh)
+						   bool *refresh, int *logical_wm,
+						   bool *logical_wm_given)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -97,6 +98,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*synchronous_commit = NULL;
 	if (refresh)
 		*refresh = true;
+	if (logical_wm)
+		*logical_wm_given = false;
 
 	/* Parse options */
 	foreach(lc, options)
@@ -182,6 +185,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "work_mem") == 0 && logical_wm)
+		{
+			if (*logical_wm_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			*logical_wm_given = true;
+			*logical_wm = defGetInt32(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -325,6 +338,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		enabled_given;
 	bool		enabled;
 	bool		copy_data;
+	int			logical_wm;
+	bool		logical_wm_given;
 	char	   *synchronous_commit;
 	char	   *conninfo;
 	char	   *slotname;
@@ -341,7 +356,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
 							   &enabled, &create_slot, &slotname_given,
 							   &slotname, &copy_data, &synchronous_commit,
-							   NULL);
+							   NULL, &logical_wm, &logical_wm_given);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -419,6 +434,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
+	if (logical_wm_given)
+		values[Anum_pg_subscription_subworkmem - 1] =
+			Int32GetDatum(logical_wm);
+	else
+		nulls[Anum_pg_subscription_subworkmem - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -682,10 +703,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
+				int			logical_wm;
+				bool		logical_wm_given;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL);
+										   NULL, &synchronous_commit, NULL,
+										   &logical_wm, &logical_wm_given);
 
 				if (slotname_given)
 				{
@@ -710,6 +734,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
+				if (logical_wm_given)
+				{
+					values[Anum_pg_subscription_subworkmem - 1] =
+						Int32GetDatum(logical_wm);
+					replaces[Anum_pg_subscription_subworkmem - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -721,7 +752,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL, NULL,
+										   NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -759,7 +791,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh);
+										   NULL, &refresh, NULL, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -796,7 +828,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6eba08a..65b3266 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -406,6 +406,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, "proto_version '%u'",
 						 options->proto.logical.proto_version);
 
+		appendStringInfo(&cmd, ", work_mem '%d'",
+						 options->proto.logical.work_mem);
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ff62303..14c0ce8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1726,6 +1726,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.slotname = myslotname;
 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
+	options.proto.logical.work_mem = MySubscription->workmem;
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757..317c5d4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -21,6 +21,7 @@
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 
+#include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
 #include "utils/memutils.h"
@@ -90,11 +91,12 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names)
+						List **publication_names, int *logical_decoding_work_mem)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
+	bool		work_mem_given = false;
 
 	foreach(lc, options)
 	{
@@ -140,6 +142,29 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
 		}
+		else if (strcmp(defel->defname, "work_mem") == 0)
+		{
+			int64	parsed;
+
+			if (work_mem_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			work_mem_given = true;
+
+			if (!scanint8(strVal(defel->arg), true, &parsed))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid work_mem")));
+
+			if (parsed > PG_INT32_MAX || parsed < 64)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("work_mem \"%s\" out of range",
+								strVal(defel->arg))));
+
+			*logical_decoding_work_mem = (int)parsed;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -174,7 +199,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		/* Parse the params and ERROR if we see any we don't recognize */
 		parse_output_parameters(ctx->output_plugin_options,
 								&data->protocol_version,
-								&data->publication_names);
+								&data->publication_names,
+								&logical_decoding_work_mem);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 3cb13d8..10ea113 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -48,6 +48,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	int32		subworkmem;		/* Memory to use to decode changes. */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +75,7 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	int			workmem;		/* Memory to decode changes. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e12a934..4e68a69 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -162,6 +162,7 @@ typedef struct
 		{
 			uint32		proto_version;	/* Logical protocol version */
 			List	   *publication_names;	/* String list of publications */
+			int			work_mem;	/* Memory limit to use for decoding */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
-- 
1.8.3.1

