From 7c00681b05302dc117e227192f68f4bf2a26de84 Mon Sep 17 00:00:00 2001
From: Emre Hasegeli <emre@hasegeli.com>
Date: Mon, 13 Mar 2023 16:54:18 +0100
Subject: [PATCH v02 1/2] pgoutput: Improve error messages for options

Author: Emre Hasegeli <emre@hasegeli.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/CAE2gYzwdwtUbs-tPSV-QBwgTubiyGD2ZGsSnAVsDfAGGLDrGOA%40mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 65 ++++++++++++++-------
 1 file changed, 43 insertions(+), 22 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f9ed1083df..0370db972a 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -267,21 +267,21 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_abort_cb = pgoutput_stream_abort;
 	cb->stream_commit_cb = pgoutput_stream_commit;
 	cb->stream_change_cb = pgoutput_change;
 	cb->stream_message_cb = pgoutput_message;
 	cb->stream_truncate_cb = pgoutput_truncate;
 	/* transaction streaming - two-phase commit */
 	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
 }
 
 static void
-parse_output_parameters(List *options, PGOutputData *data)
+parse_output_options(List *options, PGOutputData *data)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
 	bool		messages_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
 
@@ -289,30 +289,32 @@ parse_output_parameters(List *options, PGOutputData *data)
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
 
 	foreach(lc, options)
 	{
 		DefElem    *defel = (DefElem *) lfirst(lc);
 
 		Assert(defel->arg == NULL || IsA(defel->arg, String));
 
-		/* Check each param, whether or not we recognize it */
+		/* Check each option, whether or not we recognize it */
 		if (strcmp(defel->defname, "proto_version") == 0)
 		{
 			unsigned long parsed;
 			char	   *endptr;
 
 			if (protocol_version_given)
 				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errcode(ERRCODE_SYNTAX_ERROR),
+						/* translator: %s is a pgoutput option */
+						errmsg("conflicting or redundant option: %s",
+							   "proto_version"));
 			protocol_version_given = true;
 
 			errno = 0;
 			parsed = strtoul(strVal(defel->arg), &endptr, 10);
 			if (errno != 0 || *endptr != '\0')
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("invalid proto_version")));
 
 			if (parsed > PG_UINT32_MAX)
@@ -320,93 +322,117 @@ parse_output_parameters(List *options, PGOutputData *data)
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("proto_version \"%s\" out of range",
 								strVal(defel->arg))));
 
 			data->protocol_version = (uint32) parsed;
 		}
 		else if (strcmp(defel->defname, "publication_names") == 0)
 		{
 			if (publication_names_given)
 				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errcode(ERRCODE_SYNTAX_ERROR),
+						/* translator: %s is a pgoutput option */
+						errmsg("conflicting or redundant option: %s",
+							   "publication_names"));
 			publication_names_given = true;
 
 			if (!SplitIdentifierString(strVal(defel->arg), ',',
 									   &data->publication_names))
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
 		}
 		else if (strcmp(defel->defname, "binary") == 0)
 		{
 			if (binary_option_given)
 				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errcode(ERRCODE_SYNTAX_ERROR),
+						/* translator: %s is a pgoutput option */
+						errmsg("conflicting or redundant option: %s",
+							   "binary"));
 			binary_option_given = true;
 
 			data->binary = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "messages") == 0)
 		{
 			if (messages_option_given)
 				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errcode(ERRCODE_SYNTAX_ERROR),
+						/* translator: %s is a pgoutput option */
+						errmsg("conflicting or redundant option: %s",
+							   "messages"));
 			messages_option_given = true;
 
 			data->messages = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
 				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errcode(ERRCODE_SYNTAX_ERROR),
+						/* translator: %s is a pgoutput option */
+						errmsg("conflicting or redundant option: %s",
+							   "streaming"));
 			streaming_given = true;
 
 			data->streaming = defGetStreamingMode(defel);
 		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			if (two_phase_option_given)
 				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errcode(ERRCODE_SYNTAX_ERROR),
+						/* translator: %s is a pgoutput option */
+						errmsg("conflicting or redundant option: %s",
+							   "two_phase"));
 			two_phase_option_given = true;
 
 			data->two_phase = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "origin") == 0)
 		{
 			char	   *origin;
 
 			if (origin_option_given)
 				ereport(ERROR,
 						errcode(ERRCODE_SYNTAX_ERROR),
-						errmsg("conflicting or redundant options"));
+						/* translator: %s is a pgoutput option */
+						errmsg("conflicting or redundant option: %s",
+							   "origin"));
 			origin_option_given = true;
 
 			origin = defGetString(defel);
 			if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
 				data->publish_no_origin = true;
 			else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
 				data->publish_no_origin = false;
 			else
 				ereport(ERROR,
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
+
+	/* Check required options */
+	if (!protocol_version_given)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+				/* translator: %s is a pgoutput option */
+				errmsg("missing pgoutput option: %s", "proto_version"));
+	if (!publication_names_given)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+				/* translator: %s is a pgoutput option */
+				errmsg("missing pgoutput option: %s", "publication_names"));
 }
 
 /*
  * Initialize this plugin
  */
 static void
 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				 bool is_init)
 {
 	PGOutputData *data = palloc0(sizeof(PGOutputData));
@@ -426,41 +452,36 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	/* This plugin uses binary protocol. */
 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
 
 	/*
 	 * This is replication start and not slot initialization.
 	 *
 	 * Parse and validate options passed by the client.
 	 */
 	if (!is_init)
 	{
-		/* Parse the params and ERROR if we see any we don't recognize */
-		parse_output_parameters(ctx->output_plugin_options, data);
+		/* Parse the options and ERROR if we see any we don't recognize */
+		parse_output_options(ctx->output_plugin_options, data);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
 			ereport(ERROR,
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 					 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
 							data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
 
 		if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
 			ereport(ERROR,
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 					 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
 							data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
 
-		if (data->publication_names == NIL)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("publication_names parameter missing")));
-
 		/*
 		 * Decide whether to enable streaming. It is disabled by default, in
 		 * which case we just update the flag in decoding context. Otherwise
 		 * we only allow it with sufficient version of the protocol, and when
 		 * the output plugin supports it.
 		 */
 		if (data->streaming == LOGICALREP_STREAM_OFF)
 			ctx->streaming = false;
 		else if (data->streaming == LOGICALREP_STREAM_ON &&
 				 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
-- 
2.39.3 (Apple Git-145)

