From 190f43a30517ddb0b474052da9860b3fb427b22f Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Fri, 14 Jun 2019 15:39:47 -0400
Subject: [PATCH 2/8] add binary column to pg_subscription support create and
 alter subcription with binary option

---
 src/backend/catalog/system_views.sql          |  2 +-
 src/backend/commands/subscriptioncmds.c       | 39 ++++++++++---
 .../libpqwalreceiver/libpqwalreceiver.c       |  3 +
 src/backend/replication/logical/worker.c      |  1 +
 src/include/catalog/catversion.h              | 58 -------------------
 src/include/catalog/pg_subscription.h         |  4 ++
 src/include/replication/walreceiver.h         |  1 +
 7 files changed, 42 insertions(+), 66 deletions(-)
 delete mode 100644 src/include/catalog/catversion.h

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 813ea8bfc3..0bee6728d6 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
     ON pg_subscription TO public;
 
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 7f156673f7..e0ea6cd413 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -59,7 +59,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh)
+						   bool *refresh, bool *binary_given, bool *binary)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -90,6 +90,12 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*synchronous_commit = NULL;
 	if (refresh)
 		*refresh = true;
+	if (binary)
+	{
+		*binary_given = false;
+		/* not all versions of pgoutput will understand this option default to false */
+		*binary = false;
+	}
 
 	/* Parse options */
 	foreach(lc, options)
@@ -175,6 +181,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		{
+			*binary_given = true;
+			*binary = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -324,8 +335,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		slotname_given;
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
-	List	   *publications;
+	bool		binary;
+	bool		binary_given;
 
+	List	   *publications;
 	/*
 	 * Parse and check options.
 	 *
@@ -334,7 +347,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
 							   &enabled, &create_slot, &slotname_given,
 							   &slotname, &copy_data, &synchronous_commit,
-							   NULL);
+							   NULL, &binary_given, &binary);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -400,6 +413,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (slotname)
@@ -669,10 +683,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
+				bool		binary_given;
+				bool		binary;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL);
+										   NULL, &synchronous_commit, NULL,
+										   &binary_given, &binary);
 
 				if (slotname_given)
 				{
@@ -697,6 +714,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
+				if (binary_given)
+				{
+				values[Anum_pg_subscription_subbinary - 1] =
+					BoolGetDatum(binary);
+					replaces[Anum_pg_subscription_subbinary - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -708,7 +732,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL, NULL,
+										   NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -746,7 +771,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh);
+										   NULL, &refresh, NULL, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -783,7 +808,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e4fd1f9bb6..2a9d966ec1 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -402,6 +402,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		char	   *pubnames_str;
 		List	   *pubnames;
 		char	   *pubnames_literal;
+		bool		binary;
 
 		appendStringInfoString(&cmd, " (");
 
@@ -423,6 +424,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
 		PQfreemem(pubnames_literal);
 		pfree(pubnames_str);
+		if (options->proto.logical.binary)
+			appendStringInfo(&cmd, ", binary 'true'");
 
 		appendStringInfoChar(&cmd, ')');
 	}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 049a3b3392..c10414b16d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1866,6 +1866,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.slotname = myslotname;
 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
+	options.proto.logical.binary = MySubscription->binary;
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
deleted file mode 100644
index 6d91fa2bde..0000000000
--- a/src/include/catalog/catversion.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * catversion.h
- *	  "Catalog version number" for PostgreSQL.
- *
- * The catalog version number is used to flag incompatible changes in
- * the PostgreSQL system catalogs.  Whenever anyone changes the format of
- * a system catalog relation, or adds, deletes, or modifies standard
- * catalog entries in such a way that an updated backend wouldn't work
- * with an old database (or vice versa), the catalog version number
- * should be changed.  The version number stored in pg_control by initdb
- * is checked against the version number compiled into the backend at
- * startup time, so that a backend can refuse to run in an incompatible
- * database.
- *
- * The point of this feature is to provide a finer grain of compatibility
- * checking than is possible from looking at the major version number
- * stored in PG_VERSION.  It shouldn't matter to end users, but during
- * development cycles we usually make quite a few incompatible changes
- * to the contents of the system catalogs, and we don't want to bump the
- * major version number for each one.  What we can do instead is bump
- * this internal version number.  This should save some grief for
- * developers who might otherwise waste time tracking down "bugs" that
- * are really just code-vs-database incompatibilities.
- *
- * The rule for developers is: if you commit a change that requires
- * an initdb, you should update the catalog version number (as well as
- * notifying the pgsql-hackers mailing list, which has been the
- * informal practice for a long time).
- *
- * The catalog version number is placed here since modifying files in
- * include/catalog is the most common kind of initdb-forcing change.
- * But it could be used to protect any kind of incompatible change in
- * database contents or layout, such as altering tuple headers.
- *
- *
- * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- * src/include/catalog/catversion.h
- *
- *-------------------------------------------------------------------------
- */
-#ifndef CATVERSION_H
-#define CATVERSION_H
-
-/*
- * We could use anything we wanted for version numbers, but I recommend
- * following the "YYYYMMDDN" style often used for DNS zone serial numbers.
- * YYYYMMDD are the date of the change, and N is the number of the change
- * on that day.  (Hopefully we'll never commit ten independent sets of
- * catalog changes on the same day...)
- */
-
-/*							yyyymmddN */
-#define CATALOG_VERSION_NO	202004022
-
-#endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0a756d42d8..100789a52f 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	bool		subbinary;		/* True if the subscription wants the
+								 * output plugin to send data in binary */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +76,7 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	bool		binary;			/* Indicates if the subscription wants data in binary format */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cf3e43128c..13f38b34d6 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -168,6 +168,7 @@ typedef struct
 		{
 			uint32		proto_version;	/* Logical protocol version */
 			List	   *publication_names;	/* String list of publications */
+			bool		binary;				/* Ask publisher output plugin to use binary */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
-- 
2.20.1 (Apple Git-117)

