From bb4a9117ea5b7eaac049c3c10b9f86afef68b572 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 30 Jun 2021 14:04:41 +0000
Subject: [PATCH v9] Refactor parse_subscription_options

Currently parse_subscription_options function receives a lot of
input parameters which makes it inextensible to add the new
parameters. So, better wrap all the input parameters within a
structure to which new parameters can be added easily. Also, use
bitmaps to pass the supported and specified options much like the
way it is done in the commit a3dc926.
---
 src/backend/commands/subscriptioncmds.c | 442 +++++++++++-------------
 src/tools/pgindent/typedefs.list        |   1 +
 2 files changed, 212 insertions(+), 231 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b862e59f1d..94201cbac8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,37 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+#define SUBOPT_NONE 0x00000000
+#define SUBOPT_CONNECT 0x00000001
+#define SUBOPT_ENABLED 0x00000002
+#define SUBOPT_CREATE_SLOT 0x00000004
+#define SUBOPT_SLOT_NAME 0x00000008
+#define SUBOPT_COPY_DATA 0x00000010
+#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
+#define SUBOPT_REFRESH 0x00000040
+#define SUBOPT_BINARY 0x00000080
+#define SUBOPT_STREAMING 0x00000100
+
+#define IsSet(val, bits)  ((val & (bits)) == (bits))
+
+/*
+ * Structure to hold a bitmap representing the user-provided CREATE/ALTER
+ * SUBSCRIPTION command options and the parsed/default values of each of them.
+ */
+typedef struct SubOpts
+{
+	bits32  specified_opts;
+	char 	*slot_name;
+	char 	*synchronous_commit;
+	bool 	connect;
+	bool 	enabled;
+	bool 	create_slot;
+	bool 	copy_data;
+	bool 	refresh;
+	bool 	binary;
+	bool 	streaming;
+} SubOpts;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@@ -56,164 +87,163 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
  *
  * Since not all options can be specified in both commands, this function
- * will report an error on options if the target output pointer is NULL to
- * accommodate that.
+ * will report an error if mutually exclusive options are specified.
  */
 static void
-parse_subscription_options(List *options,
-						   bool *connect,
-						   bool *enabled_given, bool *enabled,
-						   bool *create_slot,
-						   bool *slot_name_given, char **slot_name,
-						   bool *copy_data,
-						   char **synchronous_commit,
-						   bool *refresh,
-						   bool *binary_given, bool *binary,
-						   bool *streaming_given, bool *streaming)
+parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts)
 {
 	ListCell   *lc;
-	bool		connect_given = false;
-	bool		create_slot_given = false;
-	bool		copy_data_given = false;
-	bool		refresh_given = false;
+	bits32		specified_opts;
 
-	/* If connect is specified, the others also need to be. */
-	Assert(!connect || (enabled && create_slot && copy_data));
+	specified_opts = SUBOPT_NONE;
 
-	if (connect)
-		*connect = true;
-	if (enabled)
-	{
-		*enabled_given = false;
-		*enabled = true;
-	}
-	if (create_slot)
-		*create_slot = true;
-	if (slot_name)
-	{
-		*slot_name_given = false;
-		*slot_name = NULL;
-	}
-	if (copy_data)
-		*copy_data = true;
-	if (synchronous_commit)
-		*synchronous_commit = NULL;
-	if (refresh)
-		*refresh = true;
-	if (binary)
-	{
-		*binary_given = false;
-		*binary = false;
-	}
-	if (streaming)
-	{
-		*streaming_given = false;
-		*streaming = false;
-	}
+	Assert(supported_opts != SUBOPT_NONE);
+
+	/* If connect option is supported, the others also need to be. */
+	Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
+			IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+				  SUBOPT_COPY_DATA));
+
+	/* Set default values for the supported options. */
+	if (IsSet(supported_opts, SUBOPT_CONNECT))
+		opts->connect = true;
+
+	if (IsSet(supported_opts, SUBOPT_ENABLED))
+		opts->enabled = true;
+
+	if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
+		opts->create_slot = true;
+
+	if (IsSet(supported_opts, SUBOPT_SLOT_NAME))
+		opts->slot_name = NULL;
+
+	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
+		opts->copy_data = true;
+
+	if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT))
+		opts->synchronous_commit = NULL;
+
+	if (IsSet(supported_opts, SUBOPT_REFRESH))
+		opts->refresh = true;
+
+	if (IsSet(supported_opts, SUBOPT_BINARY))
+		opts->binary = false;
+
+	if (IsSet(supported_opts, SUBOPT_STREAMING))
+		opts->streaming = false;
 
 	/* Parse options */
-	foreach(lc, options)
+	foreach(lc, stmt_options)
 	{
 		DefElem    *defel = (DefElem *) lfirst(lc);
 
-		if (strcmp(defel->defname, "connect") == 0 && connect)
+		if (IsSet(supported_opts, SUBOPT_CONNECT) &&
+			strcmp(defel->defname, "connect") == 0)
 		{
-			if (connect_given)
+			if (IsSet(specified_opts, SUBOPT_CONNECT))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			connect_given = true;
-			*connect = defGetBoolean(defel);
+			specified_opts |= SUBOPT_CONNECT;
+			opts->connect = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "enabled") == 0 && enabled)
+		else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
+				 strcmp(defel->defname, "enabled") == 0)
 		{
-			if (*enabled_given)
+			if (IsSet(specified_opts, SUBOPT_ENABLED))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*enabled_given = true;
-			*enabled = defGetBoolean(defel);
+			specified_opts |= SUBOPT_ENABLED;
+			opts->enabled = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
+		else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+				 strcmp(defel->defname, "create_slot") == 0)
 		{
-			if (create_slot_given)
+			if (IsSet(specified_opts, SUBOPT_CREATE_SLOT))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			create_slot_given = true;
-			*create_slot = defGetBoolean(defel);
+			specified_opts |= SUBOPT_CREATE_SLOT;
+			opts->create_slot = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
+		else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+				 strcmp(defel->defname, "slot_name") == 0)
 		{
-			if (*slot_name_given)
+			if (IsSet(specified_opts, SUBOPT_SLOT_NAME))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*slot_name_given = true;
-			*slot_name = defGetString(defel);
+			specified_opts |= SUBOPT_SLOT_NAME;
+			opts->slot_name = defGetString(defel);
 
 			/* Setting slot_name = NONE is treated as no slot name. */
-			if (strcmp(*slot_name, "none") == 0)
-				*slot_name = NULL;
+			if (strcmp(opts->slot_name, "none") == 0)
+				opts->slot_name = NULL;
 		}
-		else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
+		else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+				 strcmp(defel->defname, "copy_data") == 0)
 		{
-			if (copy_data_given)
+			if (IsSet(specified_opts, SUBOPT_COPY_DATA))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			copy_data_given = true;
-			*copy_data = defGetBoolean(defel);
+			specified_opts |= SUBOPT_COPY_DATA;
+			opts->copy_data = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
-				 synchronous_commit)
+		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
+				 strcmp(defel->defname, "synchronous_commit") == 0)
 		{
-			if (*synchronous_commit)
+			if (IsSet(specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*synchronous_commit = defGetString(defel);
+			specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+			opts->synchronous_commit = defGetString(defel);
 
 			/* Test if the given value is valid for synchronous_commit GUC. */
-			(void) set_config_option("synchronous_commit", *synchronous_commit,
+			(void) set_config_option("synchronous_commit", opts->synchronous_commit,
 									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
 									 false, 0, false);
 		}
-		else if (strcmp(defel->defname, "refresh") == 0 && refresh)
+		else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
+				 strcmp(defel->defname, "refresh") == 0)
 		{
-			if (refresh_given)
+			if (IsSet(specified_opts, SUBOPT_REFRESH))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			refresh_given = true;
-			*refresh = defGetBoolean(defel);
+			specified_opts |= SUBOPT_REFRESH;
+			opts->refresh = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		else if (IsSet(supported_opts, SUBOPT_BINARY) &&
+				 strcmp(defel->defname, "binary") == 0)
 		{
-			if (*binary_given)
+			if (IsSet(specified_opts, SUBOPT_BINARY))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*binary_given = true;
-			*binary = defGetBoolean(defel);
+			specified_opts |= SUBOPT_BINARY;
+			opts->binary = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+		else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
+				 strcmp(defel->defname, "streaming") == 0)
 		{
-			if (*streaming_given)
+			if (IsSet(specified_opts, SUBOPT_STREAMING))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*streaming_given = true;
-			*streaming = defGetBoolean(defel);
+			specified_opts |= SUBOPT_STREAMING;
+			opts->streaming = defGetBoolean(defel);
 		}
 		else
 			ereport(ERROR,
@@ -225,66 +255,76 @@ parse_subscription_options(List *options,
 	 * We've been explicitly asked to not connect, that requires some
 	 * additional processing.
 	 */
-	if (connect && !*connect)
+	if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
 	{
 		/* Check for incompatible options from the user. */
-		if (enabled && *enabled_given && *enabled)
+		if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) &&
+			IsSet(specified_opts, SUBOPT_ENABLED))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "enabled = true")));
 
-		if (create_slot && create_slot_given && *create_slot)
+		if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+			IsSet(specified_opts, SUBOPT_CREATE_SLOT))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "create_slot = true")));
 
-		if (copy_data && copy_data_given && *copy_data)
+		if (opts->copy_data && IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+			IsSet(specified_opts, SUBOPT_COPY_DATA))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
 		/* Change the defaults of other options. */
-		*enabled = false;
-		*create_slot = false;
-		*copy_data = false;
+		opts->enabled = false;
+		opts->create_slot = false;
+		opts->copy_data = false;
 	}
 
 	/*
 	 * Do additional checking for disallowed combination when slot_name = NONE
 	 * was used.
 	 */
-	if (slot_name && *slot_name_given && !*slot_name)
+	if (!opts->slot_name && IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+		IsSet(specified_opts, SUBOPT_SLOT_NAME))
 	{
-		if (enabled && *enabled_given && *enabled)
+		if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) &&
+			IsSet(specified_opts, SUBOPT_ENABLED))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
 							"slot_name = NONE", "enabled = true")));
 
-		if (create_slot && create_slot_given && *create_slot)
+		if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+			IsSet(specified_opts, SUBOPT_CREATE_SLOT))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
 							"slot_name = NONE", "create_slot = true")));
 
-		if (enabled && !*enabled_given && *enabled)
+		if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) &&
+			!IsSet(specified_opts, SUBOPT_ENABLED))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("subscription with %s must also set %s",
 							"slot_name = NONE", "enabled = false")));
 
-		if (create_slot && !create_slot_given && *create_slot)
+		if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+			!IsSet(specified_opts, SUBOPT_CREATE_SLOT))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("subscription with %s must also set %s",
 							"slot_name = NONE", "create_slot = false")));
 	}
+
+	opts->specified_opts = specified_opts;
 }
 
 /*
@@ -331,37 +371,22 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
-	bool		connect;
-	bool		enabled_given;
-	bool		enabled;
-	bool		copy_data;
-	bool		streaming;
-	bool		streaming_given;
-	char	   *synchronous_commit;
 	char	   *conninfo;
-	char	   *slotname;
-	bool		slotname_given;
-	bool		binary;
-	bool		binary_given;
 	char		originname[NAMEDATALEN];
-	bool		create_slot;
 	List	   *publications;
+	bits32		supported_opts = SUBOPT_NONE;
+	SubOpts 	opts = {0};
+
+	supported_opts |= (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+					   SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |  SUBOPT_BINARY |
+					   SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_STREAMING);
 
 	/*
 	 * Parse and check options.
 	 *
 	 * Connection and publication should not be specified here.
 	 */
-	parse_subscription_options(stmt->options,
-							   &connect,
-							   &enabled_given, &enabled,
-							   &create_slot,
-							   &slotname_given, &slotname,
-							   &copy_data,
-							   &synchronous_commit,
-							   NULL,	/* no "refresh" */
-							   &binary_given, &binary,
-							   &streaming_given, &streaming);
+	parse_subscription_options(stmt->options, supported_opts, &opts);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -369,7 +394,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * CREATE SUBSCRIPTION inside a transaction block if creating a
 	 * replication slot.
 	 */
-	if (create_slot)
+	if (opts.create_slot)
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
 	if (!superuser())
@@ -399,12 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 						stmt->subname)));
 	}
 
-	if (!slotname_given && slotname == NULL)
-		slotname = stmt->subname;
+	if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
+		 opts.slot_name == NULL)
+		opts.slot_name = stmt->subname;
 
 	/* The default for synchronous_commit of subscriptions is off. */
-	if (synchronous_commit == NULL)
-		synchronous_commit = "off";
+	if (opts.synchronous_commit == NULL)
+		opts.synchronous_commit = "off";
 
 	conninfo = stmt->conninfo;
 	publications = stmt->publication;
@@ -426,18 +452,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
-	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
-	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
-	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
+	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
+	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
-	if (slotname)
+	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
-			DirectFunctionCall1(namein, CStringGetDatum(slotname));
+			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
 	else
 		nulls[Anum_pg_subscription_subslotname - 1] = true;
 	values[Anum_pg_subscription_subsynccommit - 1] =
-		CStringGetTextDatum(synchronous_commit);
+		CStringGetTextDatum(opts.synchronous_commit);
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
@@ -456,7 +482,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * Connect to remote side to execute requested commands and fetch table
 	 * info.
 	 */
-	if (connect)
+	if (opts.connect)
 	{
 		char	   *err;
 		WalReceiverConn *wrconn;
@@ -477,7 +503,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
 			 */
-			table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
 			/*
 			 * Get the table list from publisher and build local table status
@@ -504,15 +530,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			 * won't use the initial snapshot for anything, so no need to
 			 * export it.
 			 */
-			if (create_slot)
+			if (opts.create_slot)
 			{
-				Assert(slotname);
+				Assert(opts.slot_name);
 
-				walrcv_create_slot(wrconn, slotname, false,
+				walrcv_create_slot(wrconn, opts.slot_name, false,
 								   CRS_NOEXPORT_SNAPSHOT, NULL);
 				ereport(NOTICE,
 						(errmsg("created replication slot \"%s\" on publisher",
-								slotname)));
+								opts.slot_name)));
 			}
 		}
 		PG_FINALLY();
@@ -529,7 +555,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 	table_close(rel, RowExclusiveLock);
 
-	if (enabled)
+	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
@@ -764,6 +790,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 	bool		update_tuple = false;
 	Subscription *sub;
 	Form_pg_subscription form;
+	bits32		supported_opts = SUBOPT_NONE;
+	SubOpts 	opts = {0};
 
 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -799,59 +827,46 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
 			{
-				char	   *slotname;
-				bool		slotname_given;
-				char	   *synchronous_commit;
-				bool		binary_given;
-				bool		binary;
-				bool		streaming_given;
-				bool		streaming;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   &slotname_given, &slotname,
-										   NULL,	/* no "copy_data" */
-										   &synchronous_commit,
-										   NULL,	/* no "refresh" */
-										   &binary_given, &binary,
-										   &streaming_given, &streaming);
-
-				if (slotname_given)
+				supported_opts |= (SUBOPT_SLOT_NAME | SUBOPT_BINARY |
+								   SUBOPT_SYNCHRONOUS_COMMIT |
+								   SUBOPT_STREAMING);
+
+				parse_subscription_options(stmt->options, supported_opts, &opts);
+
+				if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 				{
-					if (sub->enabled && !slotname)
+					if (sub->enabled && !opts.slot_name)
 						ereport(ERROR,
 								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 								 errmsg("cannot set %s for enabled subscription",
 										"slot_name = NONE")));
 
-					if (slotname)
+					if (opts.slot_name)
 						values[Anum_pg_subscription_subslotname - 1] =
-							DirectFunctionCall1(namein, CStringGetDatum(slotname));
+							DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
 					else
 						nulls[Anum_pg_subscription_subslotname - 1] = true;
 					replaces[Anum_pg_subscription_subslotname - 1] = true;
 				}
 
-				if (synchronous_commit)
+				if (opts.synchronous_commit)
 				{
 					values[Anum_pg_subscription_subsynccommit - 1] =
-						CStringGetTextDatum(synchronous_commit);
+						CStringGetTextDatum(opts.synchronous_commit);
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
-				if (binary_given)
+				if (IsSet(opts.specified_opts, SUBOPT_BINARY))
 				{
 					values[Anum_pg_subscription_subbinary - 1] =
-						BoolGetDatum(binary);
+						BoolGetDatum(opts.binary);
 					replaces[Anum_pg_subscription_subbinary - 1] = true;
 				}
 
-				if (streaming_given)
+				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
 					values[Anum_pg_subscription_substream - 1] =
-						BoolGetDatum(streaming);
+						BoolGetDatum(opts.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
@@ -861,31 +876,22 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_ENABLED:
 			{
-				bool		enabled,
-							enabled_given;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   &enabled_given, &enabled,
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   NULL,	/* no "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no streaming */
-				Assert(enabled_given);
-
-				if (!sub->slotname && enabled)
+
+				supported_opts |= SUBOPT_ENABLED;
+
+				parse_subscription_options(stmt->options, supported_opts, &opts);
+				Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
+
+				if (!sub->slotname && opts.enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							 errmsg("cannot enable subscription that does not have a slot name")));
 
 				values[Anum_pg_subscription_subenabled - 1] =
-					BoolGetDatum(enabled);
+					BoolGetDatum(opts.enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-				if (enabled)
+				if (opts.enabled)
 					ApplyLauncherWakeupAtCommit();
 
 				update_tuple = true;
@@ -906,19 +912,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
-				bool		copy_data;
-				bool		refresh;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				supported_opts |= (SUBOPT_COPY_DATA | SUBOPT_REFRESH);
+
+				parse_subscription_options(stmt->options, supported_opts, &opts);
+
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -926,7 +923,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
-				if (refresh)
+				if (opts.refresh)
 				{
 					if (!sub->enabled)
 						ereport(ERROR,
@@ -939,33 +936,25 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data);
 				}
 
 				break;
 			}
 
 		case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+			supported_opts |= SUBOPT_COPY_DATA;
+			/* FALL THRU */
 		case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
 			{
-				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
-				bool		copy_data = false;
-				bool		refresh;
 				List	   *publist;
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   isadd ? &copy_data : NULL,	/* for drop, no
-																		 * "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				supported_opts |= SUBOPT_REFRESH;
 
-				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+				publist = merge_publications(sub->publications, stmt->publication,
+											 stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION ? true : false,
+											 stmt->subname);
+				parse_subscription_options(stmt->options, supported_opts, &opts);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(publist);
@@ -974,7 +963,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
-				if (refresh)
+				if (opts.refresh)
 				{
 					if (!sub->enabled)
 						ereport(ERROR,
@@ -987,7 +976,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Only refresh the added/dropped list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data);
 				}
 
 				break;
@@ -995,27 +984,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
-				bool		copy_data;
+				supported_opts |= SUBOPT_COPY_DATA;
 
 				if (!sub->enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				parse_subscription_options(stmt->options, supported_opts, &opts);
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, copy_data);
+				AlterSubscription_refresh(sub, opts.copy_data);
 
 				break;
 			}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 64c06cf952..a72d53a272 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2511,6 +2511,7 @@ StringInfoData
 StripnullState
 SubLink
 SubLinkType
+SubOpts
 SubPlan
 SubPlanState
 SubRemoveRels
-- 
2.25.1

