From edd68ea91610563030b32cd0ef26793454ccfb36 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 11 Jun 2019 15:35:11 -0400
Subject: [PATCH 1/7] First pass at working code without subscription options

---
 src/backend/replication/logical/proto.c     | 108 ++++++++++-------
 src/backend/replication/logical/worker.c    | 121 ++++++++++++++------
 src/backend/replication/pgoutput/pgoutput.c |  35 +++++-
 src/include/replication/logicalproto.h      |  20 ++--
 src/include/replication/pgoutput.h          |   1 +
 5 files changed, 198 insertions(+), 87 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index e7df47de3e..8859aa8e79 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -16,9 +16,11 @@
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_type.h"
 #include "libpq/pqformat.h"
+#include "replication/logicalrelation.h"
 #include "replication/logicalproto.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 
 /*
@@ -31,7 +33,7 @@
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple);
+								   HeapTuple tuple, bool binary_basetypes);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -139,7 +141,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  * Write INSERT to the output stream.
  */
 void
-logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
+logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary_basetypes)
 {
 	pq_sendbyte(out, 'I');		/* action INSERT */
 
@@ -151,7 +153,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple);
+	logicalrep_write_tuple(out, rel, newtuple, binary_basetypes);
 }
 
 /*
@@ -159,14 +161,10 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
  *
  * Fills the new tuple.
  */
-LogicalRepRelId
+void
 logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 {
 	char		action;
-	LogicalRepRelId relid;
-
-	/* read the relation id */
-	relid = pq_getmsgint(in, 4);
 
 	action = pq_getmsgbyte(in);
 	if (action != 'N')
@@ -175,7 +173,6 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 
 	logicalrep_read_tuple(in, newtup);
 
-	return relid;
 }
 
 /*
@@ -183,7 +180,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
-						HeapTuple newtuple)
+						HeapTuple newtuple, bool binary_basetypes)
 {
 	pq_sendbyte(out, 'U');		/* action UPDATE */
 
@@ -200,26 +197,22 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple);
+		logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple);
+	logicalrep_write_tuple(out, rel, newtuple, binary_basetypes);
 }
 
 /*
  * Read UPDATE from stream.
  */
-LogicalRepRelId
+void
 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 					   LogicalRepTupleData *oldtup,
 					   LogicalRepTupleData *newtup)
 {
 	char		action;
-	LogicalRepRelId relid;
-
-	/* read the relation id */
-	relid = pq_getmsgint(in, 4);
 
 	/* read and verify action */
 	action = pq_getmsgbyte(in);
@@ -245,14 +238,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 
 	logicalrep_read_tuple(in, newtup);
 
-	return relid;
 }
 
 /*
  * Write DELETE to the output stream.
  */
 void
-logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
+logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary_basetypes)
 {
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -268,7 +260,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple);
+	logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes);
 }
 
 /*
@@ -276,14 +268,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
  *
  * Fills the old tuple.
  */
-LogicalRepRelId
+void
 logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
 {
 	char		action;
-	LogicalRepRelId relid;
-
-	/* read the relation id */
-	relid = pq_getmsgint(in, 4);
 
 	/* read and verify action */
 	action = pq_getmsgbyte(in);
@@ -292,7 +280,6 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
 
 	logicalrep_read_tuple(in, oldtup);
 
-	return relid;
 }
 
 /*
@@ -441,7 +428,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary_basetypes)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
@@ -457,6 +444,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
 			continue;
 		nliveatts++;
 	}
+
 	pq_sendint16(out, nliveatts);
 
 	/* try to allocate enough memory from the get-go */
@@ -492,12 +480,31 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
 		typclass = (Form_pg_type) GETSTRUCT(typtup);
 
-		pq_sendbyte(out, 't');	/* 'text' data follows */
 
-		outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
-		pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
-		pfree(outputstr);
+		if (binary_basetypes &&
+				 OidIsValid(typclass->typreceive) &&
+				 (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
+				 (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
+		{
+			bytea	   *outputbytes;
+			int			len;
+			pq_sendbyte(out, 'b');	/* binary send/recv data follows */
+
+			outputbytes = OidSendFunctionCall(typclass->typsend,
+											  values[i]);
 
+			len = VARSIZE(outputbytes) - VARHDRSZ;
+			pq_sendint(out, len, 4); /* length */
+			pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
+			pfree(outputbytes);
+		}
+		else
+		{
+			pq_sendbyte(out, 't');	/* 'text' data follows */
+			outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
+			pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
+			pfree(outputstr);
+		}
 		ReleaseSysCache(typtup);
 	}
 }
@@ -518,6 +525,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 
 	memset(tuple->changed, 0, sizeof(tuple->changed));
 
+	/* default is text */
+	memset(tuple->binary, 0, sizeof(tuple->binary));
+
 	/* Read the data */
 	for (i = 0; i < natts; i++)
 	{
@@ -528,25 +538,43 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 		switch (kind)
 		{
 			case 'n':			/* null */
-				tuple->values[i] = NULL;
+				tuple->values[i].len = 0;
+				tuple->values[i].data = NULL;
 				tuple->changed[i] = true;
 				break;
 			case 'u':			/* unchanged column */
 				/* we don't receive the value of an unchanged column */
-				tuple->values[i] = NULL;
+				tuple->values[i].len = 0;
+				tuple->values[i].data = NULL;
 				break;
-			case 't':			/* text formatted value */
+			case 'b':			/* binary formatted value */
 				{
-					int			len;
-
 					tuple->changed[i] = true;
+					tuple->binary[i] = true;
+
+					int len = pq_getmsgint(in, 4); /* read length */
 
-					len = pq_getmsgint(in, 4);	/* read length */
+					tuple->values[i].data = palloc(len + 1);
+					/* and data */
+
+					pq_copymsgbytes(in, tuple->values[i].data, len);
+					tuple->values[i].len = len;
+					tuple->values[i].cursor = 0;
+					tuple->values[i].maxlen = len;
+					/* not strictly necessary but the docs say it is required */
+					tuple->values[i].data[len] = '\0';
+					break;
+				}
+			case 't':			/* text formatted value */
+				{
+					tuple->changed[i] = true;
+					int len = pq_getmsgint(in, 4);	/* read length */
 
 					/* and data */
-					tuple->values[i] = palloc(len + 1);
-					pq_copymsgbytes(in, tuple->values[i], len);
-					tuple->values[i][len] = '\0';
+					tuple->values[i].data = palloc(len + 1);
+					pq_copymsgbytes(in, tuple->values[i].data, len);
+					tuple->values[i].data[len] = '\0';
+					tuple->values[i].len = len;
 				}
 				break;
 			default:
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ff62303638..cc505f8c06 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -296,13 +296,12 @@ slot_store_error_callback(void *arg)
 }
 
 /*
- * Store data in C string form into slot.
- * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
- * use better.
+ * Store data into slot.
+ * Data can be either text or binary transfer format
  */
 static void
-slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
-					char **values)
+slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+					LogicalRepTupleData *tupleData)
 {
 	int			natts = slot->tts_tupleDescriptor->natts;
 	int			i;
@@ -327,18 +326,40 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 		int			remoteattnum = rel->attrmap[i];
 
 		if (!att->attisdropped && remoteattnum >= 0 &&
-			values[remoteattnum] != NULL)
+			tupleData->values[remoteattnum].data != NULL)
 		{
-			Oid			typinput;
-			Oid			typioparam;
 
 			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
-			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-			slot->tts_values[i] =
-				OidInputFunctionCall(typinput, values[remoteattnum],
-									 typioparam, att->atttypmod);
+			if (tupleData->binary[remoteattnum])
+			{
+				Oid typreceive;
+				Oid typioparam;
+
+				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+
+				int cursor = tupleData->values[remoteattnum].cursor;
+				slot->tts_values[i] =
+					OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum],
+										   typioparam, att->atttypmod);
+				/*
+				 * Do not advance the cursor in case we need to re-read this
+				 * This saves us from pushing all of this type logic into proto.c
+				 */
+				tupleData->values[remoteattnum].cursor = cursor;
+
+			}
+			else
+			{
+				Oid			typinput;
+				Oid			typioparam;
+
+				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+				slot->tts_values[i] =
+					OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data,
+										 typioparam, att->atttypmod);
+			}
 			slot->tts_isnull[i] = false;
 
 			errarg.local_attnum = -1;
@@ -363,14 +384,14 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 }
 
 /*
- * Modify slot with user data provided as C strings.
+ * Modify slot with user data provided.
  * This is somewhat similar to heap_modify_tuple but also calls the type
- * input function on the user data as the input is the text representation
- * of the types.
+ * input function on the user data as the input is either text or binary transfer
+ * format
  */
 static void
-slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
-					 char **values, bool *replaces)
+slot_modify_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+					LogicalRepTupleData *tupleData)
 {
 	int			natts = slot->tts_tupleDescriptor->natts;
 	int			i;
@@ -398,21 +419,43 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 		if (remoteattnum < 0)
 			continue;
 
-		if (!replaces[remoteattnum])
+		if (!tupleData->changed[remoteattnum])
 			continue;
 
-		if (values[remoteattnum] != NULL)
+		if (tupleData->values[remoteattnum].data != NULL)
 		{
-			Oid			typinput;
-			Oid			typioparam;
 
 			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
-			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-			slot->tts_values[i] =
-				OidInputFunctionCall(typinput, values[remoteattnum],
-									 typioparam, att->atttypmod);
+			if (tupleData->binary[remoteattnum])
+			{
+				Oid typreceive;
+				Oid typioparam;
+
+				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+
+
+				int cursor = tupleData->values[remoteattnum].cursor;
+				slot->tts_values[i] =
+					OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum],
+										   typioparam, att->atttypmod);
+				/*
+				 * Do not advance the cursor in case we need to re-read this
+				 * This saves us from pushing all of this type logic into proto.c
+				 */
+				tupleData->values[remoteattnum].cursor = cursor;
+			}
+			else
+			{
+				Oid			typinput;
+				Oid			typioparam;
+
+				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+				slot->tts_values[i] =
+					OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data,
+										 typioparam, att->atttypmod);
+			}
 			slot->tts_isnull[i] = false;
 
 			errarg.local_attnum = -1;
@@ -576,8 +619,12 @@ apply_handle_insert(StringInfo s)
 
 	ensure_transaction();
 
-	relid = logicalrep_read_insert(s, &newtup);
+	/* read the relation id */
+	relid = pq_getmsgint(s, 4);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+	logicalrep_read_insert(s, &newtup);
+
 	if (!should_apply_changes_for_rel(rel))
 	{
 		/*
@@ -599,7 +646,7 @@ apply_handle_insert(StringInfo s)
 
 	/* Process and store remote tuple in the slot */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	slot_store_cstrings(remoteslot, rel, newtup.values);
+	slot_store_data(remoteslot, rel, &newtup);
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
@@ -679,9 +726,12 @@ apply_handle_update(StringInfo s)
 
 	ensure_transaction();
 
-	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
-								   &newtup);
+	/* read the relation id */
+	relid = pq_getmsgint(s, 4);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+	logicalrep_read_update(s, &has_oldtup, &oldtup,
+								   &newtup);
 	if (!should_apply_changes_for_rel(rel))
 	{
 		/*
@@ -709,8 +759,8 @@ apply_handle_update(StringInfo s)
 
 	/* Build the search tuple. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	slot_store_cstrings(remoteslot, rel,
-						has_oldtup ? oldtup.values : newtup.values);
+	slot_store_data(remoteslot, rel,
+						has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
 	/*
@@ -741,7 +791,7 @@ apply_handle_update(StringInfo s)
 		/* Process and store remote tuple in the slot */
 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 		ExecCopySlot(remoteslot, localslot);
-		slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
+		slot_modify_data(remoteslot, rel, &newtup);
 		MemoryContextSwitchTo(oldctx);
 
 		EvalPlanQualSetSlot(&epqstate, remoteslot);
@@ -799,8 +849,11 @@ apply_handle_delete(StringInfo s)
 
 	ensure_transaction();
 
-	relid = logicalrep_read_delete(s, &oldtup);
+	/* read the relation id */
+	relid = pq_getmsgint(s, 4);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+	logicalrep_read_delete(s, &oldtup);
 	if (!should_apply_changes_for_rel(rel))
 	{
 		/*
@@ -828,7 +881,7 @@ apply_handle_delete(StringInfo s)
 
 	/* Find the tuple using the replica identity index. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	slot_store_cstrings(remoteslot, rel, oldtup.values);
+	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
 	/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757fca..ee20a6535c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -23,6 +23,7 @@
 
 #include "utils/inval.h"
 #include "utils/int8.h"
+#include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -90,11 +91,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names)
+						List **publication_names, bool *binary_basetypes)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
+	bool		binary_option_given = false;
+
+	// default to false
+	*binary_basetypes = false;
 
 	foreach(lc, options)
 	{
@@ -140,6 +145,22 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
 		}
+		else if (strcmp(defel->defname, "binary") == 0 )
+		{
+			bool parsed;
+			if (binary_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			binary_option_given = true;
+
+			if (!parse_bool(strVal(defel->arg), &parsed))
+							ereport(ERROR,
+									(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+									 errmsg("invalid binary option")));
+
+			*binary_basetypes = parsed;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -174,7 +195,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		/* Parse the params and ERROR if we see any we don't recognize */
 		parse_output_parameters(ctx->output_plugin_options,
 								&data->protocol_version,
-								&data->publication_names);
+								&data->publication_names,
+								&data->binary_basetypes);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
@@ -346,7 +368,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INSERT:
 			OutputPluginPrepareWrite(ctx, true);
 			logicalrep_write_insert(ctx->out, relation,
-									&change->data.tp.newtuple->tuple);
+									&change->data.tp.newtuple->tuple,
+									data->binary_basetypes);
 			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -356,7 +379,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, relation, oldtuple,
-										&change->data.tp.newtuple->tuple);
+										&change->data.tp.newtuple->tuple,
+										data->binary_basetypes);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -365,7 +389,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			{
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_delete(ctx->out, relation,
-										&change->data.tp.oldtuple->tuple);
+										&change->data.tp.oldtuple->tuple,
+										data->binary_basetypes);
 				OutputPluginWrite(ctx, true);
 			}
 			else
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 3fc430af01..4d1b7e8c4f 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -30,8 +30,12 @@
 /* Tuple coming via logical replication. */
 typedef struct LogicalRepTupleData
 {
-	/* column values in text format, or NULL for a null value: */
-	char	   *values[MaxTupleAttributeNumber];
+	/* column values */
+	StringInfoData	   values[MaxTupleAttributeNumber];
+
+	/* markers for binary */
+	bool 		binary[MaxTupleAttributeNumber];
+
 	/* markers for changed/unchanged column values: */
 	bool		changed[MaxTupleAttributeNumber];
 } LogicalRepTupleData;
@@ -86,16 +90,16 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 									XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, Relation rel,
-									HeapTuple newtuple);
-extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
+									HeapTuple newtuple, bool binary_basetypes);
+extern void logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple);
-extern LogicalRepRelId logicalrep_read_update(StringInfo in,
+									HeapTuple newtuple, bool binary_basetypes);
+extern void logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, Relation rel,
-									HeapTuple oldtuple);
-extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
+									HeapTuple oldtuple, bool binary_basetypes);
+extern void logicalrep_read_delete(StringInfo in,
 											  LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
 									  bool cascade, bool restart_seqs);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 8870721bcd..933038f385 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 
 	List	   *publication_names;
 	List	   *publications;
+	bool	   binary_basetypes;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
-- 
2.20.1 (Apple Git-117)

