From 74d625f4b533d2ced28268832d7c08799b1eac80 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Mon, 11 Nov 2019 14:38:08 -0500
Subject: [PATCH 7/7] check that the subscriber is compatible with the
 publisher

---
 .../libpqwalreceiver/libpqwalreceiver.c       |  36 +++-
 src/backend/replication/pgoutput/pgoutput.c   | 179 ++++++++++++++++++
 2 files changed, 212 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9aec824a18..19e8070c44 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -400,7 +400,6 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		char	   *pubnames_str;
 		List	   *pubnames;
 		char	   *pubnames_literal;
-		bool		binary;
 
 		appendStringInfoString(&cmd, " (");
 
@@ -422,9 +421,40 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
 		PQfreemem(pubnames_literal);
 		pfree(pubnames_str);
-		if (options->proto.logical.binary)
+		if (options->proto.logical.binary) {
 			appendStringInfo(&cmd, ", binary 'true'");
-
+			appendStringInfo(&cmd, ", sizeof_datum %zu", sizeof(Datum));
+			appendStringInfo(&cmd, ", sizeof_int %zu", sizeof(int));
+			appendStringInfo(&cmd, ", sizeof_long %zu", sizeof(long));
+			appendStringInfo(&cmd, ", bigendian %d",
+#ifdef WORDS_BIGENDIAN
+								 true
+#else
+								 false
+#endif
+								 );
+			appendStringInfo(&cmd, ", float4_byval %d",
+#ifdef USE_FLOAT4_BYVAL
+								 true
+#else
+								 false
+#endif
+								 );
+				appendStringInfo(&cmd, ", float8_byval %d",
+#ifdef USE_FLOAT8_BYVAL
+								 true
+#else
+								 false
+#endif
+								 );
+				appendStringInfo(&cmd, ", integer_datetimes %d",
+#ifdef USE_INTEGER_DATETIMES
+								 true
+#else
+								 false
+#endif
+								 );
+		}
 		appendStringInfoChar(&cmd, ')');
 	}
 	else
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ee20a6535c..2c9a5b3eb0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -97,10 +97,25 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
+	bool		sizeof_int_given = false;
+	bool		sizeof_datum_given = false;
+	bool		sizeof_long_given = false;
+	bool		big_endian_given = false;
+	bool		float4_byval_given = false;
+	bool		float8_byval_given = false;
+	bool		integer_datetimes_given = false;
+	long		datum_size;
+	long 		int_size;
+	long		long_size;
+	bool		bigendian;
+	bool		float4_byval;
+	bool		float8_byval;
+	bool		integer_datetimes;
 
 	// default to false
 	*binary_basetypes = false;
 
+
 	foreach(lc, options)
 	{
 		DefElem    *defel = (DefElem *) lfirst(lc);
@@ -160,10 +175,174 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 									 errmsg("invalid binary option")));
 
 			*binary_basetypes = parsed;
+
+
+		}
+		else if (strcmp(defel->defname, "sizeof_datum") == 0)
+		{
+
+					if (sizeof_datum_given)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("conflicting or redundant options")));
+					sizeof_datum_given = true;
+
+					if (!scanint8(strVal(defel->arg), true,  &datum_size))
+						ereport(ERROR,
+								(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+								 errmsg("invalid sizeof_datum option")));
+		}
+		else if (strcmp(defel->defname, "sizeof_int") == 0)
+		{
+
+			if (sizeof_int_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			sizeof_int_given = true;
+
+			if (!scanint8(strVal(defel->arg), true,  &int_size))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid sizeof_int option")));
+		}
+		else if (strcmp(defel->defname, "sizeof_long") == 0)
+		{
+
+			if (sizeof_long_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			sizeof_int_given = true;
+
+			if (!scanint8(strVal(defel->arg), true, &long_size))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid sizeof_long option")));
+		}
+		else if (strcmp(defel->defname, "bigendian") == 0)
+		{
+
+			if (big_endian_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			big_endian_given = true;
+
+			if (!parse_bool(strVal(defel->arg), &bigendian))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid bigendian option")));
+		}
+		else if (strcmp(defel->defname, "float4_byval") == 0)
+		{
+
+			if (float4_byval_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			float4_byval_given = true;
+
+			if (!parse_bool(strVal(defel->arg), &float4_byval))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid float4_byval option")));
+		}
+		else if (strcmp(defel->defname, "float8_byval") == 0)
+		{
+
+			if (float8_byval_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			float8_byval_given = true;
+
+			if (!parse_bool(strVal(defel->arg), &float8_byval))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid float8_byval option")));
+		}
+		else if (strcmp(defel->defname, "integer_date_times") == 0)
+		{
+
+			if (integer_datetimes_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			integer_datetimes_given = true;
+
+			if (!parse_bool(strVal(defel->arg), &integer_datetimes))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid integer_date_times option")));
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
+
+/*
+ * after we know that the subscriber is requesting binary check to make sure
+ * we are compatible with the subscriber.
+ */
+	if ( *binary_basetypes == true )
+	{
+		if (sizeof(Datum) != datum_size)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("incompatible datum size")));
+
+		if (sizeof(int)  != int_size)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("incompatible integer size")));
+
+		if (sizeof(long)  != long_size)
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							errmsg("incompatible long size")));
+		if(
+#ifdef WORDS_BIGENDIAN
+			true
+#else
+			false
+#endif
+				!= bigendian)
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							errmsg("incompatible endianness")));
+		if( float4_byval !=
+#ifdef USE_FLOAT4_BYVAL
+				true
+#else
+				false
+#endif
+				)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("incompatible float4_byval")));
+		if( float8_byval !=
+#ifdef USE_FLOAT4_BYVAL
+						true
+#else
+						false
+#endif
+						)
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("incompatible float8_byval")));
+
+		if ( integer_datetimes !=
+#ifdef USE_INTEGER_DATETIMES
+								 true
+#else
+								 false
+#endif
+								 )
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("incompatible integer_datetimes")));
+
+	}
 }
 
 /*
-- 
2.20.1 (Apple Git-117)

