From 223509362254d6962a336aeeb957b07f2d237a5c Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 7 Apr 2020 15:35:47 -0400
Subject: [PATCH 8/8] Changed binary and changed to format and use existing
 codes to get rid of one array\ Dynamically allocate the arrays to avoid
 allocating max columns sized arrays Fixed numerous issues with checking for
 validity

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 39 ++++++++-----
 src/backend/replication/logical/proto.c       | 56 +++++++++++--------
 src/backend/replication/logical/worker.c      | 30 +++++-----
 src/backend/replication/pgoutput/pgoutput.c   | 35 +++++++-----
 src/include/pg_config_manual.h                |  2 +-
 src/include/replication/logicalproto.h        |  8 +--
 6 files changed, 99 insertions(+), 71 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9130d645b6..08313fa2a5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -425,36 +425,49 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		pfree(pubnames_str);
 		if (options->proto.logical.binary) {
 			appendStringInfo(&cmd, ", binary 'true'");
-			appendStringInfo(&cmd, ", sizeof_datum %zu", sizeof(Datum));
-			appendStringInfo(&cmd, ", sizeof_int %zu", sizeof(int));
-			appendStringInfo(&cmd, ", sizeof_long %zu", sizeof(long));
-			appendStringInfo(&cmd, ", bigendian %d",
+			appendStringInfo(&cmd, ", sizeof_datum '%zu'", sizeof(Datum));
+			appendStringInfo(&cmd, ", sizeof_int '%zu'", sizeof(int));
+			appendStringInfo(&cmd, ", sizeof_long '%zu'", sizeof(long));
+			appendStringInfo(&cmd, ", bigendian '%d'",
 #ifdef WORDS_BIGENDIAN
 								 true
 #else
 								 false
 #endif
 								 );
-			appendStringInfo(&cmd, ", float4_byval %d",
+			appendStringInfo(&cmd, ", float4_byval '%d'",
+#if PG_VERSION_NUM >= 130000
+								true
+#else
 #ifdef USE_FLOAT4_BYVAL
-								 true
+								true
 #else
-								 false
+								false
 #endif
-								 );
-				appendStringInfo(&cmd, ", float8_byval %d",
+#endif
+								);
+				appendStringInfo(&cmd, ", float8_byval '%d'",
 #ifdef USE_FLOAT8_BYVAL
-								 true
+								true
 #else
-								 false
+								false
 #endif
-								 );
-				appendStringInfo(&cmd, ", integer_datetimes %d",
+								);
+				appendStringInfo(&cmd, ", integer_datetimes '%d'",
+
+/* integer date times are always enabled in version 10 and up */
+
+#if PG_VERSION_NUM >= 100000
+								true
+#else
+
 #ifdef USE_INTEGER_DATETIMES
 								 true
 #else
 								 false
 #endif
+#endif
+
 								 );
 		}
 		appendStringInfoChar(&cmd, ')');
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index d4c6283a17..73148f39f3 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -521,10 +521,12 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 	/* Get number of attributes */
 	natts = pq_getmsgint(in, 2);
 
-	memset(tuple->changed, 0, sizeof(tuple->changed));
+	tuple->values = palloc(natts * sizeof(StringInfoData *));
+
+	/* default is unchanged */
+	tuple->format = palloc(natts * sizeof(char));
+	memset(tuple->format, 't', natts * sizeof(char));
 
-	/* default is text */
-	memset(tuple->binary, 0, sizeof(tuple->binary));
 
 	/* Read the data */
 	for (i = 0; i < natts; i++)
@@ -536,45 +538,51 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 		switch (kind)
 		{
 			case 'n':			/* null */
-				tuple->values[i].len = 0;
-				tuple->values[i].data = NULL;
-				tuple->changed[i] = true;
-				break;
+				{
+					tuple->values[i] = (StringInfoData *)NULL;
+					tuple->format[i] = 't';
+					break;
+				}
 			case 'u':			/* unchanged column */
-				/* we don't receive the value of an unchanged column */
-				tuple->values[i].len = 0;
-				tuple->values[i].data = NULL;
-				break;
+				{
+					/* we don't receive the value of an unchanged column */
+					tuple->values[i] = (StringInfoData *)NULL;
+					tuple->format[i] = 'u'; /* be explicit */
+					break;
+				}
 			case 'b':			/* binary formatted value */
 				{
 					int len;
-					tuple->changed[i] = true;
-					tuple->binary[i] = true;
+					StringInfoData *value = palloc(sizeof(StringInfoData));
+					tuple->format[i] = 'b';
 
 					len = pq_getmsgint(in, 4); /* read length */
 
-					tuple->values[i].data = palloc(len + 1);
+					value->data = palloc(len + 1);
 					/* and data */
 
-					pq_copymsgbytes(in, tuple->values[i].data, len);
-					tuple->values[i].len = len;
-					tuple->values[i].cursor = 0;
-					tuple->values[i].maxlen = len;
+					pq_copymsgbytes(in, value->data, len);
+					value->len = len;
+					value->cursor = 0;
+					value->maxlen = len;
 					/* not strictly necessary but the docs say it is required */
-					tuple->values[i].data[len] = '\0';
+					value->data[len] = '\0';
+					tuple->values[i] = value;
 					break;
 				}
 			case 't':			/* text formatted value */
 				{
 					int len;
-					tuple->changed[i] = true;
+					StringInfoData *value = palloc(sizeof(StringInfoData));
+					tuple->format[i] = 't';
 					len = pq_getmsgint(in, 4);	/* read length */
 
 					/* and data */
-					tuple->values[i].data = palloc(len + 1);
-					pq_copymsgbytes(in, tuple->values[i].data, len);
-					tuple->values[i].data[len] = '\0';
-					tuple->values[i].len = len;
+					value->data = palloc(len + 1);
+					pq_copymsgbytes(in, value->data, len);
+					value->len = len;
+					value->data[len] = '\0';
+					tuple->values[i] = value;
 				}
 				break;
 			default:
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 58ada925ee..a46c97c188 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -341,28 +341,28 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 		int			remoteattnum = rel->attrmap->attnums[i];
 
 		if (!att->attisdropped && remoteattnum >= 0 &&
-			tupleData->values[remoteattnum].data != NULL)
+			tupleData->values[remoteattnum] != NULL)
 		{
 
 			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
-			if (tupleData->binary[remoteattnum])
+			if (tupleData->format[remoteattnum] == 'b')
 			{
 				Oid typreceive;
 				Oid typioparam;
 
 				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
 
-				int cursor = tupleData->values[remoteattnum].cursor;
+				int cursor = tupleData->values[remoteattnum]->cursor;
 				slot->tts_values[i] =
-					OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum],
+					OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum],
 										   typioparam, att->atttypmod);
 				/*
 				 * Do not advance the cursor in case we need to re-read this
 				 * This saves us from pushing all of this type logic into proto.c
 				 */
-				tupleData->values[remoteattnum].cursor = cursor;
+				tupleData->values[remoteattnum]->cursor = cursor;
 
 			}
 			else
@@ -372,7 +372,7 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 
 				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
 				slot->tts_values[i] =
-					OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data,
+					OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data,
 										 typioparam, att->atttypmod);
 			}
 			slot->tts_isnull[i] = false;
@@ -456,16 +456,16 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 		if (remoteattnum < 0)
 			continue;
 
-		if (!tupleData->changed[remoteattnum])
+		if (tupleData->format[remoteattnum] =='u')
 			continue;
 
-		if (tupleData->values[remoteattnum].data != NULL)
+		if (tupleData->values[remoteattnum] != NULL)
 		{
 
 			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
-			if (tupleData->binary[remoteattnum])
+			if (tupleData->format[remoteattnum] == 'b')
 			{
 				Oid typreceive;
 				Oid typioparam;
@@ -473,15 +473,15 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
 
 
-				int cursor = tupleData->values[remoteattnum].cursor;
+				int cursor = tupleData->values[remoteattnum]->cursor;
 				slot->tts_values[i] =
-					OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum],
+					OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum],
 										   typioparam, att->atttypmod);
 				/*
 				 * Do not advance the cursor in case we need to re-read this
 				 * This saves us from pushing all of this type logic into proto.c
 				 */
-				tupleData->values[remoteattnum].cursor = cursor;
+				tupleData->values[remoteattnum]->cursor = cursor;
 			}
 			else
 			{
@@ -490,7 +490,7 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 
 				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
 				slot->tts_values[i] =
-					OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data,
+					OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data,
 										 typioparam, att->atttypmod);
 			}
 			slot->tts_isnull[i] = false;
@@ -808,7 +808,7 @@ apply_handle_update(StringInfo s)
 	target_rte = list_nth(estate->es_range_table, 0);
 	for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
 	{
-		if (newtup.changed[i])
+		if (newtup.format[i] != 'u')
 			target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
 													 i + 1 - FirstLowInvalidHeapAttributeNumber);
 	}
@@ -870,7 +870,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
 	{
 		/* Process and store remote tuple in the slot */
 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		slot_modify_data(remoteslot, localslot,  rel, &newtup);
+		slot_modify_data(remoteslot, localslot,  relmapentry, newtup);
 		MemoryContextSwitchTo(oldctx);
 
 		EvalPlanQualSetSlot(&epqstate, remoteslot);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index fab6f75b49..050ccfaadb 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -18,6 +18,7 @@
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
@@ -131,7 +132,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (protocol_version_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						 errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			protocol_version_given = true;
 
 			if (!scanint8(strVal(defel->arg), true, &parsed))
@@ -152,7 +153,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (publication_names_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			publication_names_given = true;
 
 			if (!SplitIdentifierString(strVal(defel->arg), ',',
@@ -167,7 +168,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (binary_option_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			binary_option_given = true;
 
 			if (!parse_bool(strVal(defel->arg), &parsed))
@@ -185,7 +186,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 					if (sizeof_datum_given)
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
-								 errmsg("conflicting or redundant options")));
+								errmsg("conflicting or redundant options %s already provided", defel->defname)));
 					sizeof_datum_given = true;
 
 					if (!scanint8(strVal(defel->arg), true,  &datum_size))
@@ -199,7 +200,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (sizeof_int_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			sizeof_int_given = true;
 
 			if (!scanint8(strVal(defel->arg), true,  &int_size))
@@ -213,8 +214,8 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (sizeof_long_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
-			sizeof_int_given = true;
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
+			sizeof_long_given = true;
 
 			if (!scanint8(strVal(defel->arg), true, &long_size))
 				ereport(ERROR,
@@ -227,7 +228,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (big_endian_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			big_endian_given = true;
 
 			if (!parse_bool(strVal(defel->arg), &bigendian))
@@ -241,7 +242,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (float4_byval_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			float4_byval_given = true;
 
 			if (!parse_bool(strVal(defel->arg), &float4_byval))
@@ -255,7 +256,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (float8_byval_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			float8_byval_given = true;
 
 			if (!parse_bool(strVal(defel->arg), &float8_byval))
@@ -263,13 +264,13 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("invalid float8_byval option")));
 		}
-		else if (strcmp(defel->defname, "integer_date_times") == 0)
+		else if (strcmp(defel->defname, "integer_datetimes") == 0)
 		{
 
 			if (integer_datetimes_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						errmsg("conflicting or redundant options %s already provided", defel->defname)));
 			integer_datetimes_given = true;
 
 			if (!parse_bool(strVal(defel->arg), &integer_datetimes))
@@ -312,17 +313,21 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 							errmsg("incompatible endianness")));
 		if( float4_byval !=
+#if PG_VERSION_NUM >= 130000
+				true
+#else
 #ifdef USE_FLOAT4_BYVAL
 				true
 #else
 				false
+#endif
 #endif
 				)
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 					 errmsg("incompatible float4_byval")));
 		if( float8_byval !=
-#ifdef USE_FLOAT4_BYVAL
+#ifdef USE_FLOAT8_BYVAL
 						true
 #else
 						false
@@ -333,10 +338,14 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 							 errmsg("incompatible float8_byval")));
 
 		if ( integer_datetimes !=
+#if PG_VERSION_NUM >= 100000
+								true
+#else
 #ifdef USE_INTEGER_DATETIMES
 								 true
 #else
 								 false
+#endif
 #endif
 								 )
 					ereport(ERROR,
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 32f739706d..3504a19728 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -336,7 +336,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of resource consumption during sort operations;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 85351c6093..b209af4cf2 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -31,13 +31,11 @@
 typedef struct LogicalRepTupleData
 {
 	/* column values */
-	StringInfoData	   values[MaxTupleAttributeNumber];
+	StringInfoData	   **values;
 
-	/* markers for binary */
-	bool 		binary[MaxTupleAttributeNumber];
+	/* markers for changed/unchanged/binary/text */
+	char 		*format;
 
-	/* markers for changed/unchanged column values: */
-	bool		changed[MaxTupleAttributeNumber];
 } LogicalRepTupleData;
 
 typedef uint32 LogicalRepRelId;
-- 
2.20.1 (Apple Git-117)

