From b73cb8f15c1965a5328fecf13728b4c215b9ef1c Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 1 Jun 2021 07:46:44 -0700
Subject: [PATCH] WIP refactor parse_subscription_options

---
 src/backend/commands/subscriptioncmds.c | 489 ++++++++++++------------
 src/include/commands/subscriptioncmds.h |  32 ++
 2 files changed, 270 insertions(+), 251 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8aa6de1785..dd782d5ee6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -60,160 +60,165 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
  * accommodate that.
  */
 static void
-parse_subscription_options(List *options,
-						   bool *connect,
-						   bool *enabled_given, bool *enabled,
-						   bool *create_slot,
-						   bool *slot_name_given, char **slot_name,
-						   bool *copy_data,
-						   char **synchronous_commit,
-						   bool *refresh,
-						   bool *binary_given, bool *binary,
-						   bool *streaming_given, bool *streaming)
+parse_subscription_options(List *stmt_options, SubOpts *opts)
 {
 	ListCell   *lc;
-	bool		connect_given = false;
-	bool		create_slot_given = false;
-	bool		copy_data_given = false;
-	bool		refresh_given = false;
+	bits32		supported_opts;
+	bits32		specified_opts;
+	SubOptVals  *vals;
 
-	/* If connect is specified, the others also need to be. */
-	Assert(!connect || (enabled && create_slot && copy_data));
+	supported_opts = opts->supported_opts;
+	vals = &opts->vals;
+	specified_opts = SUBOPT_NONE;
 
-	if (connect)
-		*connect = true;
-	if (enabled)
-	{
-		*enabled_given = false;
-		*enabled = true;
-	}
-	if (create_slot)
-		*create_slot = true;
-	if (slot_name)
-	{
-		*slot_name_given = false;
-		*slot_name = NULL;
-	}
-	if (copy_data)
-		*copy_data = true;
-	if (synchronous_commit)
-		*synchronous_commit = NULL;
-	if (refresh)
-		*refresh = true;
-	if (binary)
-	{
-		*binary_given = false;
-		*binary = false;
-	}
-	if (streaming)
-	{
-		*streaming_given = false;
-		*streaming = false;
-	}
+	Assert(supported_opts != SUBOPT_NONE);
+
+	/* If connect option is supported, the others also need to be. */
+	Assert((supported_opts & SUBOPT_CONNECT) == 0 ||
+		   ((supported_opts & SUBOPT_ENABLED) != 0 &&
+			(supported_opts & SUBOPT_CREATE_SLOT) != 0 &&
+			(supported_opts & SUBOPT_COPY_DATA) != 0));
+
+	/* Set default values for the supported options. */
+	if ((supported_opts & SUBOPT_CONNECT) != 0)
+		vals->connect = true;
+
+	if ((supported_opts & SUBOPT_ENABLED) != 0)
+		vals->enabled = true;
+
+	if ((supported_opts & SUBOPT_CREATE_SLOT) != 0)
+		vals->create_slot = true;
+
+	if ((supported_opts & SUBOPT_SLOT_NAME) != 0)
+		vals->slot_name = NULL;
+
+	if ((supported_opts & SUBOPT_COPY_DATA) != 0)
+		vals->copy_data = true;
+
+	if ((supported_opts & SUBOPT_SYNCHRONOUS_COMMIT) != 0)
+		vals->synchronous_commit = NULL;
+
+	if ((supported_opts & SUBOPT_REFRESH) != 0)
+		vals->refresh = true;
+
+	if ((supported_opts & SUBOPT_BINARY) != 0)
+		vals->binary = false;
+
+	if ((supported_opts & SUBOPT_STREAMING) != 0)
+		vals->streaming = false;
 
 	/* Parse options */
-	foreach(lc, options)
+	foreach(lc, stmt_options)
 	{
 		DefElem    *defel = (DefElem *) lfirst(lc);
 
-		if (strcmp(defel->defname, "connect") == 0 && connect)
+		if ((supported_opts & SUBOPT_CONNECT) != 0 &&
+			strcmp(defel->defname, "connect") == 0)
 		{
-			if (connect_given)
+			if ((specified_opts & SUBOPT_CONNECT) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			connect_given = true;
-			*connect = defGetBoolean(defel);
+			specified_opts |= SUBOPT_CONNECT;
+			vals->connect = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "enabled") == 0 && enabled)
+		else if ((supported_opts & SUBOPT_ENABLED) != 0 &&
+				 strcmp(defel->defname, "enabled") == 0)
 		{
-			if (*enabled_given)
+			if ((specified_opts & SUBOPT_ENABLED) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*enabled_given = true;
-			*enabled = defGetBoolean(defel);
+			specified_opts |= SUBOPT_ENABLED;
+			vals->enabled = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
+		else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 &&
+				 strcmp(defel->defname, "create_slot") == 0)
 		{
-			if (create_slot_given)
+			if ((specified_opts & SUBOPT_CREATE_SLOT) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			create_slot_given = true;
-			*create_slot = defGetBoolean(defel);
+			specified_opts |= SUBOPT_CREATE_SLOT;
+			vals->create_slot = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
+		else if ((supported_opts & SUBOPT_SLOT_NAME) != 0 &&
+				 strcmp(defel->defname, "slot_name") == 0)
 		{
-			if (*slot_name_given)
+			if ((specified_opts & SUBOPT_SLOT_NAME) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*slot_name_given = true;
-			*slot_name = defGetString(defel);
+			specified_opts |= SUBOPT_SLOT_NAME;
+			vals->slot_name = defGetString(defel);
 
 			/* Setting slot_name = NONE is treated as no slot name. */
-			if (strcmp(*slot_name, "none") == 0)
-				*slot_name = NULL;
+			if (strcmp(vals->slot_name, "none") == 0)
+				vals->slot_name = NULL;
 		}
-		else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
+		else if ((supported_opts & SUBOPT_COPY_DATA) != 0 &&
+				 strcmp(defel->defname, "copy_data") == 0)
 		{
-			if (copy_data_given)
+			if ((specified_opts & SUBOPT_COPY_DATA) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			copy_data_given = true;
-			*copy_data = defGetBoolean(defel);
+			specified_opts |= SUBOPT_COPY_DATA;
+			vals->copy_data = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
-				 synchronous_commit)
+		else if ((supported_opts & SUBOPT_SYNCHRONOUS_COMMIT) != 0 &&
+				 strcmp(defel->defname, "synchronous_commit") == 0)
 		{
-			if (*synchronous_commit)
+			if ((specified_opts & SUBOPT_SYNCHRONOUS_COMMIT) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*synchronous_commit = defGetString(defel);
+			specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+			vals->synchronous_commit = defGetString(defel);
 
 			/* Test if the given value is valid for synchronous_commit GUC. */
-			(void) set_config_option("synchronous_commit", *synchronous_commit,
+			(void) set_config_option("synchronous_commit", vals->synchronous_commit,
 									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
 									 false, 0, false);
 		}
-		else if (strcmp(defel->defname, "refresh") == 0 && refresh)
+		else if ((supported_opts & SUBOPT_REFRESH) != 0 &&
+				 strcmp(defel->defname, "refresh") == 0)
 		{
-			if (refresh_given)
+			if ((specified_opts & SUBOPT_REFRESH) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			refresh_given = true;
-			*refresh = defGetBoolean(defel);
+			specified_opts |= SUBOPT_REFRESH;
+			vals->refresh = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		else if ((supported_opts & SUBOPT_BINARY) != 0 &&
+				 strcmp(defel->defname, "binary") == 0)
 		{
-			if (*binary_given)
+			if ((specified_opts & SUBOPT_BINARY) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*binary_given = true;
-			*binary = defGetBoolean(defel);
+			specified_opts |= SUBOPT_BINARY;
+			vals->binary = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+		else if ((supported_opts & SUBOPT_STREAMING) != 0 &&
+				 strcmp(defel->defname, "streaming") == 0)
 		{
-			if (*streaming_given)
+			if ((specified_opts & SUBOPT_STREAMING) != 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*streaming_given = true;
-			*streaming = defGetBoolean(defel);
+			specified_opts |= SUBOPT_STREAMING;
+			vals->streaming = defGetBoolean(defel);
 		}
 		else
 			ereport(ERROR,
@@ -225,66 +230,83 @@ parse_subscription_options(List *options,
 	 * We've been explicitly asked to not connect, that requires some
 	 * additional processing.
 	 */
-	if (connect && !*connect)
+	if ((supported_opts & SUBOPT_CONNECT) != 0 &&
+		!vals->connect)
 	{
+		char *incompat_opt = NULL;
+
 		/* Check for incompatible options from the user. */
-		if (enabled && *enabled_given && *enabled)
+		if ((supported_opts & SUBOPT_ENABLED) != 0 &&
+			(specified_opts & SUBOPT_ENABLED) != 0 &&
+			 vals->enabled)
+			incompat_opt = "enabled = true";
+		else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 &&
+				 (specified_opts & SUBOPT_CREATE_SLOT) != 0 &&
+				  vals->create_slot)
+			incompat_opt = "create_slot = true";
+		else if ((supported_opts & SUBOPT_COPY_DATA) != 0 &&
+				 (specified_opts & SUBOPT_COPY_DATA) != 0 &&
+				  vals->copy_data)
+			incompat_opt = "copy_data = true";
+
+		if (incompat_opt)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "enabled = true")));
-
-		if (create_slot && create_slot_given && *create_slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "create_slot = true")));
-
-		if (copy_data && copy_data_given && *copy_data)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "copy_data = true")));
+							"connect = false", incompat_opt)));
 
 		/* Change the defaults of other options. */
-		*enabled = false;
-		*create_slot = false;
-		*copy_data = false;
+		vals->enabled = false;
+		vals->create_slot = false;
+		vals->copy_data = false;
 	}
 
 	/*
 	 * Do additional checking for disallowed combination when slot_name = NONE
 	 * was used.
 	 */
-	if (slot_name && *slot_name_given && !*slot_name)
+	if ((supported_opts & SUBOPT_SLOT_NAME) != 0 &&
+		(specified_opts & SUBOPT_SLOT_NAME) != 0 &&
+		 !vals->slot_name)
 	{
-		if (enabled && *enabled_given && *enabled)
+		char *incompat_opt = NULL;
+		char *required_opt = NULL;
+
+		if ((supported_opts & SUBOPT_ENABLED) != 0 &&
+			(specified_opts & SUBOPT_ENABLED) != 0 &&
+			 vals->enabled)
+			incompat_opt = "enabled = true";
+		else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 &&
+				 (specified_opts & SUBOPT_CREATE_SLOT) != 0 &&
+				  vals->create_slot)
+			incompat_opt = "create_slot = true";
+
+		if (incompat_opt)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
-							"slot_name = NONE", "enabled = true")));
-
-		if (create_slot && create_slot_given && *create_slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("%s and %s are mutually exclusive options",
-							"slot_name = NONE", "create_slot = true")));
-
-		if (enabled && !*enabled_given && *enabled)
+							"slot_name = NONE", incompat_opt)));
+
+		if ((supported_opts & SUBOPT_ENABLED) != 0 &&
+			(specified_opts & SUBOPT_ENABLED) == 0 &&
+			 vals->enabled)
+			required_opt = "enabled = false";
+		else if ((supported_opts & SUBOPT_CREATE_SLOT) != 0 &&
+				 (specified_opts & SUBOPT_CREATE_SLOT) == 0 &&
+				  vals->create_slot)
+			required_opt = "create_slot = false";
+
+		if (required_opt)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("subscription with %s must also set %s",
-							"slot_name = NONE", "enabled = false")));
-
-		if (create_slot && !create_slot_given && *create_slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("subscription with %s must also set %s",
-							"slot_name = NONE", "create_slot = false")));
+							"slot_name = NONE", required_opt)));
 	}
+
+	opts->specified_opts = specified_opts;
 }
 
 /*
@@ -331,37 +353,28 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
-	bool		connect;
-	bool		enabled_given;
-	bool		enabled;
-	bool		copy_data;
-	bool		streaming;
-	bool		streaming_given;
-	char	   *synchronous_commit;
 	char	   *conninfo;
-	char	   *slotname;
-	bool		slotname_given;
-	bool		binary;
-	bool		binary_given;
 	char		originname[NAMEDATALEN];
-	bool		create_slot;
 	List	   *publications;
+	SubOpts 	opts;
+
+	MemSet(&opts, 0, sizeof(SubOpts));
+
+	opts.supported_opts |= SUBOPT_CONNECT;
+	opts.supported_opts |= SUBOPT_ENABLED;
+	opts.supported_opts |= SUBOPT_CREATE_SLOT;
+	opts.supported_opts |= SUBOPT_SLOT_NAME;
+	opts.supported_opts |= SUBOPT_COPY_DATA;
+	opts.supported_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+	opts.supported_opts |= SUBOPT_BINARY;
+	opts.supported_opts |= SUBOPT_STREAMING;
 
 	/*
 	 * Parse and check options.
 	 *
 	 * Connection and publication should not be specified here.
 	 */
-	parse_subscription_options(stmt->options,
-							   &connect,
-							   &enabled_given, &enabled,
-							   &create_slot,
-							   &slotname_given, &slotname,
-							   &copy_data,
-							   &synchronous_commit,
-							   NULL,	/* no "refresh" */
-							   &binary_given, &binary,
-							   &streaming_given, &streaming);
+	parse_subscription_options(stmt->options, &opts);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -369,7 +382,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * CREATE SUBSCRIPTION inside a transaction block if creating a
 	 * replication slot.
 	 */
-	if (create_slot)
+	if (opts.vals.create_slot)
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
 	if (!superuser())
@@ -399,12 +412,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 						stmt->subname)));
 	}
 
-	if (!slotname_given && slotname == NULL)
-		slotname = stmt->subname;
+	if ((opts.specified_opts & SUBOPT_SLOT_NAME) == 0 &&
+		 opts.vals.slot_name == NULL)
+		opts.vals.slot_name = stmt->subname;
 
 	/* The default for synchronous_commit of subscriptions is off. */
-	if (synchronous_commit == NULL)
-		synchronous_commit = "off";
+	if (opts.vals.synchronous_commit == NULL)
+		opts.vals.synchronous_commit = "off";
 
 	conninfo = stmt->conninfo;
 	publications = stmt->publication;
@@ -426,18 +440,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
-	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
-	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
-	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.vals.enabled);
+	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.vals.binary);
+	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.vals.streaming);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
-	if (slotname)
+	if (opts.vals.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
-			DirectFunctionCall1(namein, CStringGetDatum(slotname));
+			DirectFunctionCall1(namein, CStringGetDatum(opts.vals.slot_name));
 	else
 		nulls[Anum_pg_subscription_subslotname - 1] = true;
 	values[Anum_pg_subscription_subsynccommit - 1] =
-		CStringGetTextDatum(synchronous_commit);
+		CStringGetTextDatum(opts.vals.synchronous_commit);
 	values[Anum_pg_subscription_subpublications - 1] =
 		publicationListToArray(publications);
 
@@ -456,7 +470,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 * Connect to remote side to execute requested commands and fetch table
 	 * info.
 	 */
-	if (connect)
+	if (opts.vals.connect)
 	{
 		char	   *err;
 		WalReceiverConn *wrconn;
@@ -476,7 +490,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
 			 */
-			table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			table_state = opts.vals.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
 			/*
 			 * Get the table list from publisher and build local table status
@@ -503,15 +517,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			 * won't use the initial snapshot for anything, so no need to
 			 * export it.
 			 */
-			if (create_slot)
+			if (opts.vals.create_slot)
 			{
-				Assert(slotname);
+				Assert(opts.vals.slot_name);
 
-				walrcv_create_slot(wrconn, slotname, false,
+				walrcv_create_slot(wrconn, opts.vals.slot_name, false,
 								   CRS_NOEXPORT_SNAPSHOT, NULL);
 				ereport(NOTICE,
 						(errmsg("created replication slot \"%s\" on publisher",
-								slotname)));
+								opts.vals.slot_name)));
 			}
 		}
 		PG_FINALLY();
@@ -528,7 +542,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 	table_close(rel, RowExclusiveLock);
 
-	if (enabled)
+	if (opts.vals.enabled)
 		ApplyLauncherWakeupAtCommit();
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
@@ -797,59 +811,51 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
 			{
-				char	   *slotname;
-				bool		slotname_given;
-				char	   *synchronous_commit;
-				bool		binary_given;
-				bool		binary;
-				bool		streaming_given;
-				bool		streaming;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   &slotname_given, &slotname,
-										   NULL,	/* no "copy_data" */
-										   &synchronous_commit,
-										   NULL,	/* no "refresh" */
-										   &binary_given, &binary,
-										   &streaming_given, &streaming);
-
-				if (slotname_given)
+				SubOpts 	opts;
+
+				MemSet(&opts, 0, sizeof(SubOpts));
+
+				opts.supported_opts |= SUBOPT_SLOT_NAME;
+				opts.supported_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+				opts.supported_opts |= SUBOPT_BINARY;
+				opts.supported_opts |= SUBOPT_STREAMING;
+
+				parse_subscription_options(stmt->options, &opts);
+
+				if ((opts.specified_opts & SUBOPT_SLOT_NAME) != 0)
 				{
-					if (sub->enabled && !slotname)
+					if (sub->enabled && !opts.vals.slot_name)
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
 								 errmsg("cannot set %s for enabled subscription",
 										"slot_name = NONE")));
 
-					if (slotname)
+					if (opts.vals.slot_name)
 						values[Anum_pg_subscription_subslotname - 1] =
-							DirectFunctionCall1(namein, CStringGetDatum(slotname));
+							DirectFunctionCall1(namein, CStringGetDatum(opts.vals.slot_name));
 					else
 						nulls[Anum_pg_subscription_subslotname - 1] = true;
 					replaces[Anum_pg_subscription_subslotname - 1] = true;
 				}
 
-				if (synchronous_commit)
+				if (opts.vals.synchronous_commit)
 				{
 					values[Anum_pg_subscription_subsynccommit - 1] =
-						CStringGetTextDatum(synchronous_commit);
+						CStringGetTextDatum(opts.vals.synchronous_commit);
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
-				if (binary_given)
+				if ((opts.specified_opts & SUBOPT_BINARY) != 0)
 				{
 					values[Anum_pg_subscription_subbinary - 1] =
-						BoolGetDatum(binary);
+						BoolGetDatum(opts.vals.binary);
 					replaces[Anum_pg_subscription_subbinary - 1] = true;
 				}
 
-				if (streaming_given)
+				if ((opts.specified_opts & SUBOPT_STREAMING) != 0)
 				{
 					values[Anum_pg_subscription_substream - 1] =
-						BoolGetDatum(streaming);
+						BoolGetDatum(opts.vals.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
@@ -859,31 +865,25 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_ENABLED:
 			{
-				bool		enabled,
-							enabled_given;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   &enabled_given, &enabled,
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   NULL,	/* no "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no streaming */
-				Assert(enabled_given);
-
-				if (!sub->slotname && enabled)
+				SubOpts 	opts;
+
+				MemSet(&opts, 0, sizeof(SubOpts));
+
+				opts.supported_opts |= SUBOPT_ENABLED;
+
+				parse_subscription_options(stmt->options, &opts);
+				Assert((opts.specified_opts & SUBOPT_ENABLED) != 0);
+
+				if (!sub->slotname && opts.vals.enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("cannot enable subscription that does not have a slot name")));
 
 				values[Anum_pg_subscription_subenabled - 1] =
-					BoolGetDatum(enabled);
+					BoolGetDatum(opts.vals.enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-				if (enabled)
+				if (opts.vals.enabled)
 					ApplyLauncherWakeupAtCommit();
 
 				update_tuple = true;
@@ -904,19 +904,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
-				bool		copy_data;
-				bool		refresh;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				SubOpts 	opts;
+
+				MemSet(&opts, 0, sizeof(SubOpts));
+
+				opts.supported_opts |= SUBOPT_COPY_DATA;
+				opts.supported_opts |= SUBOPT_REFRESH;
+
+				parse_subscription_options(stmt->options, &opts);
+
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -924,7 +920,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
-				if (refresh)
+				if (opts.vals.refresh)
 				{
 					if (!sub->enabled)
 						ereport(ERROR,
@@ -937,7 +933,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, opts.vals.copy_data);
 				}
 
 				break;
@@ -947,23 +943,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 		case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
 			{
 				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
-				bool		copy_data;
-				bool		refresh;
 				List	   *publist;
+				SubOpts 	opts;
+
+				MemSet(&opts, 0, sizeof(SubOpts));
+
+				opts.supported_opts |= SUBOPT_REFRESH;
+
+				if (isadd)
+					opts.supported_opts |= SUBOPT_COPY_DATA;
 
 				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   isadd ? &copy_data : NULL,	/* for drop, no
-																		 * "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				parse_subscription_options(stmt->options, &opts);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(publist);
@@ -972,7 +964,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
-				if (refresh)
+				if (opts.vals.refresh)
 				{
 					if (!sub->enabled)
 						ereport(ERROR,
@@ -985,7 +977,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Only refresh the added/dropped list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, opts.vals.copy_data);
 				}
 
 				break;
@@ -993,27 +985,22 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
-				bool		copy_data;
+				SubOpts 	opts;
+
+				MemSet(&opts, 0, sizeof(SubOpts));
+
+				opts.supported_opts |= SUBOPT_COPY_DATA;
 
 				if (!sub->enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				parse_subscription_options(stmt->options, &opts);
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, copy_data);
+				AlterSubscription_refresh(sub, opts.vals.copy_data);
 
 				break;
 			}
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 3b926f35d7..59461f7b6f 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -26,4 +26,36 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
+typedef struct SubOptVals
+{
+	bool connect;
+	bool enabled;
+	bool create_slot;
+	char *slot_name;
+	bool copy_data;
+	char *synchronous_commit;
+	bool refresh;
+	bool binary;
+	bool streaming;
+} SubOptVals;
+
+/* options for CREATE/ALTER SUBSCRIPTION  */
+typedef struct SubOpts
+{
+	bits32    supported_opts; /* bitmask of supported SUBOPT_* */
+	bits32    specified_opts; /* bitmask of user specified SUBOPT_* */
+	SubOptVals vals;
+} SubOpts;
+
+#define SUBOPT_NONE 0x00000000
+#define SUBOPT_CONNECT 0x00000001
+#define SUBOPT_ENABLED 0x00000002
+#define SUBOPT_CREATE_SLOT 0x00000004
+#define SUBOPT_SLOT_NAME 0x00000008
+#define SUBOPT_COPY_DATA 0x00000010
+#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
+#define SUBOPT_REFRESH 0x00000040
+#define SUBOPT_BINARY 0x00000080
+#define SUBOPT_STREAMING 0x00000100
+
 #endif							/* SUBSCRIPTIONCMDS_H */
-- 
2.25.1

