From 4b83abadc1aa81d5972ee6978573b4c565f4a3c4 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 1 Jul 2021 15:51:58 +0000
Subject: [PATCH v11] Refactor function parse_subscription_options.

Instead of using multiple parameters in parse_subscription_options
function signature, use the struct SubOpts that encapsulate all the
subscription options and their values. It will be useful for future work
where we need to add other options in the subscription. Also, use bitmaps
to pass the supported and retrieve the specified options much like the way
it is done in the commit a3dc926009.

Author: Bharath Rupireddy
Reviewed-By: Peter Smith, Amit Kapila, Alvaro Herrera
Discussion: https://postgr.es/m/CALj2ACXtoQczfNsDQWobypVvHbX2DtgEHn8DawS0eGFwuo72kw@mail.gmail.com
---
 src/backend/commands/subscriptioncmds.c | 439 +++++++++++-------------
 src/tools/pgindent/typedefs.list        |   1 +
 2 files changed, 207 insertions(+), 233 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b862e59f1d..eb88d877a5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,41 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+/*
+ * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
+ * command.
+ */
+#define SUBOPT_CONNECT				0x00000001
+#define SUBOPT_ENABLED				0x00000002
+#define SUBOPT_CREATE_SLOT			0x00000004
+#define SUBOPT_SLOT_NAME			0x00000008
+#define SUBOPT_COPY_DATA			0x00000010
+#define SUBOPT_SYNCHRONOUS_COMMIT	0x00000020
+#define SUBOPT_REFRESH				0x00000040
+#define SUBOPT_BINARY				0x00000080
+#define SUBOPT_STREAMING			0x00000100
+
+/* check if the 'val' has 'bits' set */
+#define IsSet(val, bits)  (((val) & (bits)) == (bits))
+
+/*
+ * Structure to hold a bitmap representing the user-provided CREATE/ALTER
+ * SUBSCRIPTION command options and the parsed/default values of each of them.
+ */
+typedef struct SubOpts
+{
+	bits32		specified_opts;
+	char	   *slot_name;
+	char	   *synchronous_commit;
+	bool		connect;
+	bool		enabled;
+	bool		create_slot;
+	bool		copy_data;
+	bool		refresh;
+	bool		binary;
+	bool		streaming;
+} SubOpts;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@@ -56,164 +91,151 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
  *
  * Since not all options can be specified in both commands, this function
- * will report an error on options if the target output pointer is NULL to
- * accommodate that.
+ * will report an error if mutually exclusive options are specified.
+ *
+ * Caller is expected to have cleared 'opts'.
  */
 static void
-parse_subscription_options(List *options,
-						   bool *connect,
-						   bool *enabled_given, bool *enabled,
-						   bool *create_slot,
-						   bool *slot_name_given, char **slot_name,
-						   bool *copy_data,
-						   char **synchronous_commit,
-						   bool *refresh,
-						   bool *binary_given, bool *binary,
-						   bool *streaming_given, bool *streaming)
+parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts)
 {
 	ListCell   *lc;
-	bool		connect_given = false;
-	bool		create_slot_given = false;
-	bool		copy_data_given = false;
-	bool		refresh_given = false;
-
-	/* If connect is specified, the others also need to be. */
-	Assert(!connect || (enabled && create_slot && copy_data));
 
-	if (connect)
-		*connect = true;
-	if (enabled)
-	{
-		*enabled_given = false;
-		*enabled = true;
-	}
-	if (create_slot)
-		*create_slot = true;
-	if (slot_name)
-	{
-		*slot_name_given = false;
-		*slot_name = NULL;
-	}
-	if (copy_data)
-		*copy_data = true;
-	if (synchronous_commit)
-		*synchronous_commit = NULL;
-	if (refresh)
-		*refresh = true;
-	if (binary)
-	{
-		*binary_given = false;
-		*binary = false;
-	}
-	if (streaming)
-	{
-		*streaming_given = false;
-		*streaming = false;
-	}
+	/* caller must expect some option */
+	Assert(supported_opts != 0);
+
+	/* If connect option is supported, these others also need to be. */
+	Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
+		   IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+				 SUBOPT_COPY_DATA));
+
+	/* Set default values for the boolean supported options. */
+	if (IsSet(supported_opts, SUBOPT_CONNECT))
+		opts->connect = true;
+	if (IsSet(supported_opts, SUBOPT_ENABLED))
+		opts->enabled = true;
+	if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
+		opts->create_slot = true;
+	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
+		opts->copy_data = true;
+	if (IsSet(supported_opts, SUBOPT_REFRESH))
+		opts->refresh = true;
+	if (IsSet(supported_opts, SUBOPT_BINARY))
+		opts->binary = false;
+	if (IsSet(supported_opts, SUBOPT_STREAMING))
+		opts->streaming = false;
 
 	/* Parse options */
-	foreach(lc, options)
+	foreach(lc, stmt_options)
 	{
 		DefElem    *defel = (DefElem *) lfirst(lc);
 
-		if (strcmp(defel->defname, "connect") == 0 && connect)
+		if (IsSet(supported_opts, SUBOPT_CONNECT) &&
+			strcmp(defel->defname, "connect") == 0)
 		{
-			if (connect_given)
+			if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			connect_given = true;
-			*connect = defGetBoolean(defel);
+			opts->specified_opts |= SUBOPT_CONNECT;
+			opts->connect = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "enabled") == 0 && enabled)
+		else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
+				 strcmp(defel->defname, "enabled") == 0)
 		{
-			if (*enabled_given)
+			if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*enabled_given = true;
-			*enabled = defGetBoolean(defel);
+			opts->specified_opts |= SUBOPT_ENABLED;
+			opts->enabled = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
+		else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+				 strcmp(defel->defname, "create_slot") == 0)
 		{
-			if (create_slot_given)
+			if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			create_slot_given = true;
-			*create_slot = defGetBoolean(defel);
+			opts->specified_opts |= SUBOPT_CREATE_SLOT;
+			opts->create_slot = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
+		else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+				 strcmp(defel->defname, "slot_name") == 0)
 		{
-			if (*slot_name_given)
+			if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*slot_name_given = true;
-			*slot_name = defGetString(defel);
+			opts->specified_opts |= SUBOPT_SLOT_NAME;
+			opts->slot_name = defGetString(defel);
 
 			/* Setting slot_name = NONE is treated as no slot name. */
-			if (strcmp(*slot_name, "none") == 0)
-				*slot_name = NULL;
+			if (strcmp(opts->slot_name, "none") == 0)
+				opts->slot_name = NULL;
 		}
-		else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
+		else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+				 strcmp(defel->defname, "copy_data") == 0)
 		{
-			if (copy_data_given)
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			copy_data_given = true;
-			*copy_data = defGetBoolean(defel);
+			opts->specified_opts |= SUBOPT_COPY_DATA;
+			opts->copy_data = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
-				 synchronous_commit)
+		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
+				 strcmp(defel->defname, "synchronous_commit") == 0)
 		{
-			if (*synchronous_commit)
+			if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*synchronous_commit = defGetString(defel);
+			opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+			opts->synchronous_commit = defGetString(defel);
 
 			/* Test if the given value is valid for synchronous_commit GUC. */
-			(void) set_config_option("synchronous_commit", *synchronous_commit,
+			(void) set_config_option("synchronous_commit", opts->synchronous_commit,
 									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
 									 false, 0, false);
 		}
-		else if (strcmp(defel->defname, "refresh") == 0 && refresh)
+		else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
+				 strcmp(defel->defname, "refresh") == 0)
 		{
-			if (refresh_given)
+			if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			refresh_given = true;
-			*refresh = defGetBoolean(defel);
+			opts->specified_opts |= SUBOPT_REFRESH;
+			opts->refresh = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		else if (IsSet(supported_opts, SUBOPT_BINARY) &&
+				 strcmp(defel->defname, "binary") == 0)
 		{
-			if (*binary_given)
+			if (IsSet(opts->specified_opts, SUBOPT_BINARY))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*binary_given = true;
-			*binary = defGetBoolean(defel);
+			opts->specified_opts |= SUBOPT_BINARY;
+			opts->binary = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+		else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
+				 strcmp(defel->defname, "streaming") == 0)
 		{
-			if (*streaming_given)
+			if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*streaming_given = true;
-			*streaming = defGetBoolean(defel);
+			opts->specified_opts |= SUBOPT_STREAMING;
+			opts->streaming = defGetBoolean(defel);
 		}
 		else
 			ereport(ERROR,
@@ -225,63 +247,81 @@ parse_subscription_options(List *options,
 	 * We've been explicitly asked to not connect, that requires some
 	 * additional processing.
 	 */
-	if (connect && !*connect)
+	if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
 	{
 		/* Check for incompatible options from the user. */
-		if (enabled && *enabled_given && *enabled)
+		if (opts->enabled &&
+			IsSet(supported_opts, SUBOPT_ENABLED) &&
+			IsSet(opts->specified_opts, SUBOPT_ENABLED))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "enabled = true")));
 
-		if (create_slot && create_slot_given && *create_slot)
+		if (opts->create_slot &&
+			IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+			IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "create_slot = true")));
 
-		if (copy_data && copy_data_given && *copy_data)
+		if (opts->copy_data &&
+			IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+			IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
 		/* Change the defaults of other options. */
-		*enabled = false;
-		*create_slot = false;
-		*copy_data = false;
+		opts->enabled = false;
+		opts->create_slot = false;
+		opts->copy_data = false;
 	}
 
 	/*
 	 * Do additional checking for disallowed combination when slot_name = NONE
 	 * was used.
 	 */
-	if (slot_name && *slot_name_given && !*slot_name)
+	if (!opts->slot_name &&
+		IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+		IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
 	{
-		if (enabled && *enabled_given && *enabled)
+		if (opts->enabled &&
+			IsSet(supported_opts, SUBOPT_ENABLED) &&
+			IsSet(opts->specified_opts, SUBOPT_ENABLED))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
 							"slot_name = NONE", "enabled = true")));
 
-		if (create_slot && create_slot_given && *create_slot)
+		if (opts->create_slot &&
+			IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+			IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
+			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
 							"slot_name = NONE", "create_slot = true")));
 
-		if (enabled && !*enabled_given && *enabled)
+		if (opts->enabled &&
+			IsSet(supported_opts, SUBOPT_ENABLED) &&
+			!IsSet(opts->specified_opts, SUBOPT_ENABLED))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("subscription with %s must also set %s",
 							"slot_name = NONE", "enabled = false")));
 
-		if (create_slot && !create_slot_given && *create_slot)
+		if (opts->create_slot &&
+			IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+			!IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
+			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("subscription with %s must also set %s",
 							"slot_name = NONE", "create_slot = false")));
 	}
@@ -331,37 +371,22 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
-	bool		connect;
-	bool		enabled_given;
-	bool		enabled;
-	bool		copy_data;
-	bool		streaming;
-	bool		streaming_given;
-	char	   *synchronous_commit;
 	char	   *conninfo;
-	char	   *slotname;
-	bool		slotname_given;
-	bool		binary;
-	bool		binary_given;
 	char		originname[NAMEDATALEN];
-	bool		create_slot;
 	List	   *publications;
+	bits32		supported_opts;
+	SubOpts		opts = {0};
 
 	/*
 	 * Parse and check options.
 	 *
 	 * Connection and publication should not be specified here.
 	 */
-	parse_subscription_options(stmt->options,
-							   &connect,
-							   &enabled_given, &enabled,
-							   &create_slot,
-							   &slotname_given, &slotname,
-							   &copy_data,
-							   &synchronous_commit,
-							   NULL,	/* no "refresh" */
-							   &binary_given, &binary,
-							   &streaming_given, &streaming);
+	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
+					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+					  SUBOPT_STREAMING);
+	parse_subscription_options(stmt->options, supported_opts, &opts);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -369,7 +394,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * CREATE SUBSCRIPTION inside a transaction block if creating a
 	 * replication slot.
 	 */
-	if (create_slot)
+	if (opts.create_slot)
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
 	if (!superuser())
@@ -399,12 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 						stmt->subname)));
 	}
 
-	if (!slotname_given && slotname == NULL)
-		slotname = stmt->subname;
+	if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
+		opts.slot_name == NULL)
+		opts.slot_name = stmt->subname;
 
 	/* The default for synchronous_commit of subscriptions is off. */
-	if (synchronous_commit == NULL)
-		synchronous_commit = "off";
+	if (opts.synchronous_commit == NULL)
+		opts.synchronous_commit = "off";
 
 	conninfo = stmt->conninfo;
 	publications = stmt->publication;
@@ -426,18 +452,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
-	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
-	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
-	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
+	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
+	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
-	if (slotname)
+	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
-			DirectFunctionCall1(namein, CStringGetDatum(slotname));
+			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
 	else
 		nulls[Anum_pg_subscription_subslotname - 1] = true;
 	values[Anum_pg_subscription_subsynccommit - 1] =
-		CStringGetTextDatum(synchronous_commit);
+		CStringGetTextDatum(opts.synchronous_commit);
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
@@ -456,7 +482,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * Connect to remote side to execute requested commands and fetch table
 	 * info.
 	 */
-	if (connect)
+	if (opts.connect)
 	{
 		char	   *err;
 		WalReceiverConn *wrconn;
@@ -477,7 +503,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
 			 */
-			table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
 			/*
 			 * Get the table list from publisher and build local table status
@@ -504,15 +530,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			 * won't use the initial snapshot for anything, so no need to
 			 * export it.
 			 */
-			if (create_slot)
+			if (opts.create_slot)
 			{
-				Assert(slotname);
+				Assert(opts.slot_name);
 
-				walrcv_create_slot(wrconn, slotname, false,
+				walrcv_create_slot(wrconn, opts.slot_name, false,
 								   CRS_NOEXPORT_SNAPSHOT, NULL);
 				ereport(NOTICE,
 						(errmsg("created replication slot \"%s\" on publisher",
-								slotname)));
+								opts.slot_name)));
 			}
 		}
 		PG_FINALLY();
@@ -529,7 +555,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 	table_close(rel, RowExclusiveLock);
 
-	if (enabled)
+	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
@@ -764,6 +790,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 	bool		update_tuple = false;
 	Subscription *sub;
 	Form_pg_subscription form;
+	bits32		supported_opts;
+	SubOpts		opts = {0};
 
 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -799,59 +827,46 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
 			{
-				char	   *slotname;
-				bool		slotname_given;
-				char	   *synchronous_commit;
-				bool		binary_given;
-				bool		binary;
-				bool		streaming_given;
-				bool		streaming;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   &slotname_given, &slotname,
-										   NULL,	/* no "copy_data" */
-										   &synchronous_commit,
-										   NULL,	/* no "refresh" */
-										   &binary_given, &binary,
-										   &streaming_given, &streaming);
-
-				if (slotname_given)
+				supported_opts = (SUBOPT_SLOT_NAME |
+								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+								  SUBOPT_STREAMING);
+
+				parse_subscription_options(stmt->options, supported_opts, &opts);
+
+				if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 				{
-					if (sub->enabled && !slotname)
+					if (sub->enabled && !opts.slot_name)
 						ereport(ERROR,
 								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 								 errmsg("cannot set %s for enabled subscription",
 										"slot_name = NONE")));
 
-					if (slotname)
+					if (opts.slot_name)
 						values[Anum_pg_subscription_subslotname - 1] =
-							DirectFunctionCall1(namein, CStringGetDatum(slotname));
+							DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
 					else
 						nulls[Anum_pg_subscription_subslotname - 1] = true;
 					replaces[Anum_pg_subscription_subslotname - 1] = true;
 				}
 
-				if (synchronous_commit)
+				if (opts.synchronous_commit)
 				{
 					values[Anum_pg_subscription_subsynccommit - 1] =
-						CStringGetTextDatum(synchronous_commit);
+						CStringGetTextDatum(opts.synchronous_commit);
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
-				if (binary_given)
+				if (IsSet(opts.specified_opts, SUBOPT_BINARY))
 				{
 					values[Anum_pg_subscription_subbinary - 1] =
-						BoolGetDatum(binary);
+						BoolGetDatum(opts.binary);
 					replaces[Anum_pg_subscription_subbinary - 1] = true;
 				}
 
-				if (streaming_given)
+				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
 					values[Anum_pg_subscription_substream - 1] =
-						BoolGetDatum(streaming);
+						BoolGetDatum(opts.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
@@ -861,31 +876,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_ENABLED:
 			{
-				bool		enabled,
-							enabled_given;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   &enabled_given, &enabled,
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   NULL,	/* no "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no streaming */
-				Assert(enabled_given);
-
-				if (!sub->slotname && enabled)
+				parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts);
+				Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
+
+				if (!sub->slotname && opts.enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							 errmsg("cannot enable subscription that does not have a slot name")));
 
 				values[Anum_pg_subscription_subenabled - 1] =
-					BoolGetDatum(enabled);
+					BoolGetDatum(opts.enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-				if (enabled)
+				if (opts.enabled)
 					ApplyLauncherWakeupAtCommit();
 
 				update_tuple = true;
@@ -906,19 +909,9 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
-				bool		copy_data;
-				bool		refresh;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+				parse_subscription_options(stmt->options, supported_opts, &opts);
+
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -926,7 +919,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
-				if (refresh)
+				if (opts.refresh)
 				{
 					if (!sub->enabled)
 						ereport(ERROR,
@@ -939,7 +932,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data);
 				}
 
 				break;
@@ -948,25 +941,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 		case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
 		case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
 			{
-				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
-				bool		copy_data = false;
-				bool		refresh;
 				List	   *publist;
+				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   isadd ? &copy_data : NULL,	/* for drop, no
-																		 * "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				supported_opts = SUBOPT_REFRESH;
+				if (isadd)
+					supported_opts |= SUBOPT_COPY_DATA;
 
-				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+				parse_subscription_options(stmt->options, supported_opts, &opts);
 
+				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(publist);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -974,7 +958,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
-				if (refresh)
+				if (opts.refresh)
 				{
 					if (!sub->enabled)
 						ereport(ERROR,
@@ -987,7 +971,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Only refresh the added/dropped list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, opts.copy_data);
 				}
 
 				break;
@@ -995,27 +979,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
-				bool		copy_data;
-
 				if (!sub->enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, copy_data);
+				AlterSubscription_refresh(sub, opts.copy_data);
 
 				break;
 			}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 64c06cf952..a72d53a272 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2511,6 +2511,7 @@ StringInfoData
 StripnullState
 SubLink
 SubLinkType
+SubOpts
 SubPlan
 SubPlanState
 SubRemoveRels
-- 
2.25.1

