From 157e87f96113589f17007def474ab70edcdd6da8 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 2 Aug 2021 14:23:18 +0900
Subject: [PATCH v14 2/3] Add RESET command to ALTER SUBSCRIPTION command.

ALTER SUBSCRIPTION ... RESET command resets subscription
parameters. The parameters that can be set are streaming, binary,
synchronous_commit.

The RESET parameter for ALTER SUBSCRIPTION is required by the
follow-up commit that introduces a new resettable subscription
parameter "skip_xid".
---
 doc/src/sgml/ref/alter_subscription.sgml   |  8 ++-
 src/backend/commands/subscriptioncmds.c    | 59 +++++++++++++++-------
 src/backend/parser/gram.y                  | 11 +++-
 src/include/nodes/parsenodes.h             |  5 +-
 src/test/regress/expected/subscription.out | 14 ++++-
 src/test/regress/sql/subscription.sql      | 13 +++++
 6 files changed, 87 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index bec5e9c483..c9700e8699 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RESET ( <replaceable class="parameter">subscription_parameter</replaceable> [, ... ] )
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
 </synopsis>
@@ -193,16 +194,21 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
 
    <varlistentry>
     <term><literal>SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
+    <term><literal>RESET ( <replaceable class="parameter">subscription_parameter</replaceable> [, ... ] )</literal></term>
     <listitem>
      <para>
       This clause alters parameters originally set by
       <xref linkend="sql-createsubscription"/>.  See there for more
-      information.  The parameters that can be altered
+      information.  The parameters that can be set
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, and
       <literal>streaming</literal>.
      </para>
+     <para>
+       The parameters that can be reset are: <literal>streaming</literal>,
+       <literal>binary</literal>, <literal>synchronous_commit</literal>.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c47ba26369..896ec8b836 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -99,7 +99,8 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
  */
 static void
 parse_subscription_options(ParseState *pstate, List *stmt_options,
-						   bits32 supported_opts, SubOpts *opts)
+						   bits32 supported_opts, SubOpts *opts,
+						   bool is_reset)
 {
 	ListCell   *lc;
 
@@ -134,6 +135,11 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	{
 		DefElem    *defel = (DefElem *) lfirst(lc);
 
+		if (is_reset && defel->arg != NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("RESET must not include values for parameters")));
+
 		if (IsSet(supported_opts, SUBOPT_CONNECT) &&
 			strcmp(defel->defname, "connect") == 0)
 		{
@@ -192,12 +198,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 				errorConflictingDefElem(defel, pstate);
 
 			opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
-			opts->synchronous_commit = defGetString(defel);
+			if (!is_reset)
+			{
+				opts->synchronous_commit = defGetString(defel);
 
-			/* Test if the given value is valid for synchronous_commit GUC. */
-			(void) set_config_option("synchronous_commit", opts->synchronous_commit,
-									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
-									 false, 0, false);
+				/*
+				 * Test if the given value is valid for synchronous_commit
+				 * GUC.
+				 */
+				(void) set_config_option("synchronous_commit", opts->synchronous_commit,
+										 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+										 false, 0, false);
+			}
 		}
 		else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
 				 strcmp(defel->defname, "refresh") == 0)
@@ -215,7 +227,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 				errorConflictingDefElem(defel, pstate);
 
 			opts->specified_opts |= SUBOPT_BINARY;
-			opts->binary = defGetBoolean(defel);
+			if (!is_reset)
+				opts->binary = defGetBoolean(defel);
 		}
 		else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
 				 strcmp(defel->defname, "streaming") == 0)
@@ -224,7 +237,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 				errorConflictingDefElem(defel, pstate);
 
 			opts->specified_opts |= SUBOPT_STREAMING;
-			opts->streaming = defGetBoolean(defel);
+			if (!is_reset)
+				opts->streaming = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
@@ -397,7 +411,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
-	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
+	parse_subscription_options(pstate, stmt->options, supported_opts, &opts,
+							   false);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -866,14 +881,21 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	switch (stmt->kind)
 	{
-		case ALTER_SUBSCRIPTION_OPTIONS:
+		case ALTER_SUBSCRIPTION_SET_OPTIONS:
+		case ALTER_SUBSCRIPTION_RESET_OPTIONS:
 			{
-				supported_opts = (SUBOPT_SLOT_NAME |
-								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING);
+				bool is_reset = (stmt->kind == ALTER_SUBSCRIPTION_RESET_OPTIONS);
+
+				if (is_reset)
+					supported_opts = (SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+									  SUBOPT_STREAMING);
+				else
+					supported_opts = (SUBOPT_SLOT_NAME |
+									  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+									  SUBOPT_STREAMING);
 
 				parse_subscription_options(pstate, stmt->options,
-										   supported_opts, &opts);
+										   supported_opts, &opts, is_reset);
 
 				if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 				{
@@ -926,7 +948,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 		case ALTER_SUBSCRIPTION_ENABLED:
 			{
 				parse_subscription_options(pstate, stmt->options,
-										   SUBOPT_ENABLED, &opts);
+										   SUBOPT_ENABLED, &opts, false);
+
 				Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
 
 				if (!sub->slotname && opts.enabled)
@@ -961,7 +984,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
 				parse_subscription_options(pstate, stmt->options,
-										   supported_opts, &opts);
+										   supported_opts, &opts, false);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -1008,7 +1031,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
 				parse_subscription_options(pstate, stmt->options,
-										   supported_opts, &opts);
+										   supported_opts, &opts, false);
 
 				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
 				values[Anum_pg_subscription_subpublications - 1] =
@@ -1056,7 +1079,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
 				parse_subscription_options(pstate, stmt->options,
-										   SUBOPT_COPY_DATA, &opts);
+										   SUBOPT_COPY_DATA, &opts, false);
 
 				/*
 				 * The subscription option "two_phase" requires that
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e3068a374e..70558f964a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9721,7 +9721,16 @@ AlterSubscriptionStmt:
 				{
 					AlterSubscriptionStmt *n =
 						makeNode(AlterSubscriptionStmt);
-					n->kind = ALTER_SUBSCRIPTION_OPTIONS;
+					n->kind = ALTER_SUBSCRIPTION_SET_OPTIONS;
+					n->subname = $3;
+					n->options = $5;
+					$$ = (Node *)n;
+				}
+			| ALTER SUBSCRIPTION name RESET definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_RESET_OPTIONS;
 					n->subname = $3;
 					n->options = $5;
 					$$ = (Node *)n;
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 3138877553..539921cb52 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3676,7 +3676,8 @@ typedef struct CreateSubscriptionStmt
 
 typedef enum AlterSubscriptionType
 {
-	ALTER_SUBSCRIPTION_OPTIONS,
+	ALTER_SUBSCRIPTION_SET_OPTIONS,
+	ALTER_SUBSCRIPTION_RESET_OPTIONS,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -3688,7 +3689,7 @@ typedef enum AlterSubscriptionType
 typedef struct AlterSubscriptionStmt
 {
 	NodeTag		type;
-	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
+	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_SET_OPTIONS, etc */
 	char	   *subname;		/* Name of the subscription */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 15a1ac6398..e4c16cab66 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -281,11 +281,23 @@ ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
 ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+-- ok
+ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit);
+ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit, binary, streaming);
+-- fail - unsupported parameters
+ALTER SUBSCRIPTION regress_testsub RESET (connect);
+ERROR:  unrecognized subscription parameter: "connect"
+ALTER SUBSCRIPTION regress_testsub RESET (enabled);
+ERROR:  unrecognized subscription parameter: "enabled"
+-- fail - RESET must not include values
+ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off);
+ERROR:  RESET must not include values for parameters
 \dRs+
                                                                      List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7faa935a2a..3b0fbea897 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -215,6 +215,19 @@ ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+-- ok
+ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit);
+ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit, binary, streaming);
+
+-- fail - unsupported parameters
+ALTER SUBSCRIPTION regress_testsub RESET (connect);
+ALTER SUBSCRIPTION regress_testsub RESET (enabled);
+
+-- fail - RESET must not include values
+ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off);
+
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
-- 
2.24.3 (Apple Git-128)

