From 0908f9d4a490010ffa2e3f69170c34406f83fffe Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 8 Aug 2022 14:14:44 +0300
Subject: [PATCH] Allow logical replication to copy table in binary

If binary option is enabled in a subscription, copy tables in binary
format during table synchronization.

Without this patch, table are copied in text format even if the
subscription is created with binary option. When binary format is
beneficial to use, allowing the subscription to copy tables in binary in
table sync phase reduces the time spent on copy.

Discussion: https://postgr.es/m/CAGPVpCQvAziCLknEnygY0v1-KBtg%2BOm-9JHJYZOnNPKFJPompw%40mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                  |   3 +-
 doc/src/sgml/logical-replication.sgml       |   9 +-
 doc/src/sgml/ref/alter_subscription.sgml    |   4 +-
 doc/src/sgml/ref/create_subscription.sgml   |  21 +-
 src/backend/catalog/pg_subscription.c       |   1 +
 src/backend/commands/subscriptioncmds.c     |  66 ++++-
 src/backend/replication/logical/tablesync.c |  13 +-
 src/bin/pg_dump/pg_dump.c                   |  16 +-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/psql/describe.c                     |   8 +-
 src/bin/psql/tab-complete.c                 |   5 +-
 src/include/catalog/pg_subscription.h       |  13 +
 src/include/commands/subscriptioncmds.h     |   1 +
 src/test/regress/expected/subscription.out  | 144 +++++-----
 src/test/subscription/t/002_types.pl        | 291 ++++++++++++++++----
 src/test/subscription/t/014_binary.pl       |  28 +-
 16 files changed, 472 insertions(+), 152 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..bf8a4c6fd8 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7906,7 +7906,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        <structfield>subbinary</structfield> <type>bool</type>
       </para>
       <para>
-       If true, the subscription will request that the publisher send data
+       If true, the subscription will copy the initial data to synchronize
+       relations in binary format and request that the publisher send data
        in binary format
       </para></entry>
      </row>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..9540917ea3 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -241,10 +241,11 @@
    types of the columns do not need to match, as long as the text
    representation of the data can be converted to the target type.  For
    example, you can replicate from a column of type <type>integer</type> to a
-   column of type <type>bigint</type>.  The target table can also have
-   additional columns not provided by the published table.  Any such columns
-   will be filled with the default value as specified in the definition of the
-   target table.
+   column of type <type>bigint</type>.  Replication in binary format is type
+   specific and does not allow to replicate data between different types according
+   to its restrictions.  The target table can also have additional columns not provided
+   by the published table.  Any such columns will be filled with the default
+   value as specified in the definition of the target table.
   </para>
 
   <sect2 id="logical-replication-subscription-slot">
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index ad93553a1d..050155d428 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -213,8 +213,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>, <literal>origin</literal>
+      and <literal>copy_format</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index eba72c6af6..9e757ae7de 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -189,8 +189,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         <term><literal>binary</literal> (<type>boolean</type>)</term>
         <listitem>
          <para>
-          Specifies whether the subscription will request the publisher to
-          send the data in binary format (as opposed to text).
+          Specifies whether the subscription will copy the initial data to
+          synchronize relations in binary format and also request the publisher
+          to send the data in binary format too (as opposed to text).
           The default is <literal>false</literal>.
           Even when this option is enabled, only data types having
           binary send and receive functions will be transferred in binary.
@@ -349,6 +350,22 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+      <varlistentry>
+        <term><literal>copy_format</literal> (<type>string</type>)</term>
+        <listitem>
+         <para>
+          Specifies the format in which pre-existing data on the publisher will
+          copied to the subscriber. Supported formats are 
+          <literal>text</literal> and <literal>binary</literal>. The default is
+          <literal>text</literal>.
+
+          <literal>binary</literal> format can be selected only if
+          <literal>binary</literal> is set to <literal>true</literal>. Note
+          that data will not be copied if <literal>copy_data = false</literal>.  
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..da078acefe 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->copyformat = subform->subcopyformat;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..0ec45466f5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_COPY_FORMAT			0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	char	    copy_format;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_COPY_FORMAT))
+		opts->copy_format = LOGICALREP_COPY_AS_TEXT;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_COPY_FORMAT) &&
+				 strcmp(defel->defname, "copy_format") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_FORMAT))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_COPY_FORMAT;
+			opts->copy_format = defGetCopyFormat(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -404,6 +417,16 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	if (!opts->binary &&
+		opts->copy_format == LOGICALREP_COPY_AS_BINARY)
+	{
+		ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("%s and %s are mutually exclusive options",
+								"binary = false", "copy_format = binary")));
+	}
+
 }
 
 /*
@@ -560,7 +583,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_COPY_FORMAT);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -649,6 +673,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		publicationListToArray(publications);
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
+	values[Anum_pg_subscription_subcopyformat - 1] = CharGetDatum(opts.copy_format);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -1054,7 +1079,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_COPY_FORMAT);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1118,6 +1143,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_suborigin - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_COPY_FORMAT))
+				{
+					values[Anum_pg_subscription_subcopyformat - 1] =
+						CharGetDatum(opts.copy_format);
+					replaces[Anum_pg_subscription_subcopyformat - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -2195,3 +2227,33 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Extract the copy format value from a DefElem.
+ */
+char
+defGetCopyFormat(DefElem *def)
+{
+	char	   *sval;
+
+	/*
+	 * If no parameter value given, set it to text format.
+	 */
+	if (!def->arg)
+		return LOGICALREP_COPY_AS_TEXT;
+
+	/*
+	 * Currently supported formats are "text" and "binary".
+	 */
+	sval = defGetString(def);
+	if (pg_strcasecmp(sval, "text") == 0)
+		return LOGICALREP_COPY_AS_TEXT;
+	if (pg_strcasecmp(sval, "binary") == 0)
+		return LOGICALREP_COPY_AS_BINARY;
+
+	ereport(ERROR,
+			(errcode(ERRCODE_SYNTAX_ERROR),
+			 errmsg("%s value should be either \"text\" or \"binary\"",
+					def->defname)));
+	return LOGICALREP_COPY_AS_TEXT;	/* keep compiler quiet */
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..62c866c9eb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -101,6 +101,7 @@
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
@@ -1090,6 +1091,7 @@ copy_table(Relation rel)
 	CopyFromState cstate;
 	List	   *attnamelist;
 	ParseState *pstate;
+	List 	   *options = NIL;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1168,6 +1170,15 @@ copy_table(Relation rel)
 
 		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
+
+	/* If the publisher is v16 or later, specify the format to copy data. */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+	{
+		char *format = MySubscription->copyformat == LOGICALREP_COPY_AS_BINARY ? "binary" : "text";
+		appendStringInfo(&cmd, "  WITH (FORMAT %s)", format);
+		options = lappend(options, makeDefElem("format", (Node *) makeString(format), -1));
+	}
+
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -1184,7 +1195,7 @@ copy_table(Relation rel)
 										 NULL, false, false);
 
 	attnamelist = make_copy_attnamelist(relmapentry);
-	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 
 	/* Do the copy */
 	(void) CopyFrom(cstate);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c7651ab..2b4e4c52db 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4494,6 +4494,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subcopyformat;
 	int			i,
 				ntups;
 
@@ -4546,9 +4547,14 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+		appendPQExpBufferStr(query, " s.suborigin,\n");
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY);
+
+	if (fout->remoteVersion >= 160000)
+		appendPQExpBufferStr(query, " s.subcopyformat\n");
+	else
+		appendPQExpBuffer(query, " '%c' AS subcopyformat\n", LOGICALREP_COPY_AS_TEXT);
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4576,6 +4582,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subcopyformat = PQfnumber(res, "subcopyformat");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4606,6 +4613,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subcopyformat =
+			pg_strdup(PQgetvalue(res, i, i_subcopyformat));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4687,6 +4696,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (strcmp(subinfo->subcopyformat, "t") != 0)
+		appendPQExpBuffer(query, ", copy_format = %s", subinfo->subcopyformat);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..f6b7dbcd97 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -662,6 +662,7 @@ typedef struct _SubscriptionInfo
 	char	   *suborigin;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	char	   *subcopyformat;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..b941dc1e88 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6543,6 +6543,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 			appendPQExpBuffer(&buf,
 							  ", subskiplsn AS \"%s\"\n",
 							  gettext_noop("Skip LSN"));
+
+		/* Copy format is only supported in v16 and higher */
+		if (pset.sversion >= 160000)
+			appendPQExpBuffer(&buf,
+							  ", subcopyformat AS \"%s\"\n",
+							  gettext_noop("Copy Format"));
 	}
 
 	/* Only display subscriptions in current database. */
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5e1882eaea..acc178780c 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1926,7 +1926,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "streaming", "synchronous_commit", "copy_format");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3269,7 +3269,8 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "origin", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "streaming", "synchronous_commit", "two_phase",
+					  "copy_format");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..fe74cf2cce 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -88,6 +88,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	char		subcopyformat BKI_DEFAULT(LOGICALREP_COPY_AS_TEXT); /* Copy format */
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -137,6 +138,8 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	char		copyformat;		/* Copy format for subscriptions with binary
+								 * option enabled */
 } Subscription;
 
 /* Disallow streaming in-progress transactions. */
@@ -154,6 +157,16 @@ typedef struct Subscription
  */
 #define LOGICALREP_STREAM_PARALLEL 'p'
 
+/*
+ * Copy data as text. This is the default.
+ */
+#define LOGICALREP_COPY_AS_TEXT 't'
+
+/*
+ * Copy data as binary if the binary options is enabled in the subscription.
+ */
+#define LOGICALREP_COPY_AS_BINARY 'b'
+
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 214dc6c29e..7937defea5 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -27,5 +27,6 @@ extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
 extern char defGetStreamingMode(DefElem *def);
+extern char defGetCopyFormat(DefElem *def);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..cf8bdec5dc 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345  | t
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0      | t
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN | Copy Format 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------+-------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0      | t
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                        List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,18 +404,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN | Copy Format 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0      | t
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl
index 6b5853b80b..1be2f3c304 100644
--- a/src/test/subscription/t/002_types.pl
+++ b/src/test/subscription/t/002_types.pl
@@ -19,6 +19,11 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
+# Create binary subscriber node
+my $node_subscriber_binary = PostgreSQL::Test::Cluster->new('subscriber_binary');
+$node_subscriber_binary->init(allows_streaming => 'logical');
+$node_subscriber_binary->start;
+
 # Create some preexisting content on publisher
 my $ddl = qq(
 	CREATE EXTENSION hstore WITH SCHEMA public;
@@ -104,6 +109,100 @@ my $ddl = qq(
 # Setup structure on both nodes
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber_binary->safe_psql('postgres', $ddl);
+
+# Insert initial test data
+$node_publisher->safe_psql(
+	'postgres', qq(
+	-- test_tbl_one_array_col
+	INSERT INTO tst_one_array (a, b) VALUES
+		(1, '{1, 2, 3}'),
+		(2, '{2, 3, 1}');
+
+	-- test_tbl_arrays
+	INSERT INTO tst_arrays (a, b, c, d) VALUES
+		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
+		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}');
+
+	-- test_tbl_single_enum
+	INSERT INTO tst_one_enum (a, b) VALUES
+		(1, 'a'),
+		(2, 'b');
+
+	-- test_tbl_enums
+	INSERT INTO tst_enums (a, b) VALUES
+		('a', '{b, c}'),
+		('b', '{c, a}');
+
+	-- test_tbl_single_composites
+	INSERT INTO tst_one_comp (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composites
+	INSERT INTO tst_comps (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]);
+
+	-- test_tbl_composite_with_enums
+	INSERT INTO tst_comp_enum (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composite_with_enums_array
+	INSERT INTO tst_comp_enum_array (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]);
+
+	-- test_tbl_composite_with_single_enums_array_in_composite
+	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
+		(1, ROW(1.0, '{a, b, c}', 1)),
+		(2, ROW(2.0, '{a, b, c}', 2));
+
+	-- test_tbl_composite_with_enums_array_in_composite
+	INSERT INTO tst_comp_enum_what (a, b) VALUES
+		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
+		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]);
+
+	-- test_tbl_mixed_composites
+	INSERT INTO tst_comp_mix_array (a, b) VALUES
+		(ROW(
+			ROW(1,'a',1),
+			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
+			'a',
+			'{a,b,NULL,c}'),
+		ARRAY[
+			ROW(
+				ROW(1,'a',1),
+				ARRAY[
+					ROW(1,'a',1)::tst_comp_basic_t,
+					ROW(2,'b',2)::tst_comp_basic_t,
+					NULL
+					],
+				'a',
+				'{a,b,c}'
+				)::tst_comp_mix_t
+			]
+		);
+
+	-- test_tbl_range
+	INSERT INTO tst_range (a, b) VALUES
+		(1, '[1, 10]'),
+		(2, '[2, 20]');
+
+	-- test_tbl_range_array
+	INSERT INTO tst_range_array (a, b, c) VALUES
+		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
+		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}');
+
+	-- tst_hstore
+	INSERT INTO tst_hstore (a, b) VALUES
+		(1, '"a"=>"1"'),
+		(2, '"zzz"=>"foo"');
+
+	-- tst_dom_constr
+	INSERT INTO tst_dom_constr VALUES (10);
+));
 
 # Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
@@ -113,89 +212,126 @@ $node_publisher->safe_psql('postgres',
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
 );
+$node_subscriber_binary->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_binary CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_binary_slot, binary = true, copy_format = 'binary')"
+);
 
 # Wait for initial sync to finish as well
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+$node_subscriber_binary->wait_for_subscription_sync($node_publisher, 'tap_sub_binary');
 
-# Insert initial test data
+my $sync_check =  qq(
+	SET timezone = '+2';
+	SELECT a, b FROM tst_one_array ORDER BY a;
+	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+	SELECT a, b FROM tst_one_enum ORDER BY a;
+	SELECT a, b FROM tst_enums ORDER BY a;
+	SELECT a, b FROM tst_one_comp ORDER BY a;
+	SELECT a, b FROM tst_comps ORDER BY a;
+	SELECT a, b FROM tst_comp_enum ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+	SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+	SELECT a, b FROM tst_range ORDER BY a;
+	SELECT a, b, c FROM tst_range_array ORDER BY a;
+	SELECT a, b FROM tst_hstore ORDER BY a;
+);
+
+# Check the synced data on subscribers
+my $result = $node_subscriber->safe_psql('postgres', $sync_check);
+my $result_binary = $node_subscriber_binary->safe_psql('postgres', $sync_check);
+
+my $sync_result = '1|{1,2,3}
+2|{2,3,1}
+{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+1|a
+2|b
+a|{b,c}
+b|{c,a}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,"{a,b,c}",1)
+2|(2,"{a,b,c}",2)
+(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+1|[1,11)
+2|[2,21)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+1|"a"=>"1"
+2|"zzz"=>"foo"';
+
+is( $result, $sync_result, 'check initial sync on subscriber');
+is( $result_binary, $sync_result, 'check initial sync on subscriber in binary');
+
+# Insert test data for update to apply changes
 $node_publisher->safe_psql(
 	'postgres', qq(
 	-- test_tbl_one_array_col
 	INSERT INTO tst_one_array (a, b) VALUES
-		(1, '{1, 2, 3}'),
-		(2, '{2, 3, 1}'),
 		(3, '{3, 2, 1}'),
 		(4, '{4, 3, 2}'),
 		(5, '{5, NULL, 3}');
 
 	-- test_tbl_arrays
 	INSERT INTO tst_arrays (a, b, c, d) VALUES
-		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
-		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'),
 		('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'),
 		('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'),
 		('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}');
 
 	-- test_tbl_single_enum
 	INSERT INTO tst_one_enum (a, b) VALUES
-		(1, 'a'),
-		(2, 'b'),
 		(3, 'c'),
 		(4, 'd'),
 		(5, NULL);
 
 	-- test_tbl_enums
 	INSERT INTO tst_enums (a, b) VALUES
-		('a', '{b, c}'),
-		('b', '{c, a}'),
 		('c', '{b, a}'),
 		('d', '{c, b}'),
 		('e', '{d, NULL}');
 
 	-- test_tbl_single_composites
 	INSERT INTO tst_one_comp (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, NULL, 5));
 
 	-- test_tbl_composites
 	INSERT INTO tst_comps (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]),
 		(ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]),
 		(ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]);
 
 	-- test_tbl_composite_with_enums
 	INSERT INTO tst_comp_enum (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, 'e', NULL));
 
 	-- test_tbl_composite_with_enums_array
 	INSERT INTO tst_comp_enum_array (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]),
 		(ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]),
 		(ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]);
 
 	-- test_tbl_composite_with_single_enums_array_in_composite
 	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
-		(1, ROW(1.0, '{a, b, c}', 1)),
-		(2, ROW(2.0, '{a, b, c}', 2)),
 		(3, ROW(3.0, '{a, b, c}', 3)),
 		(4, ROW(4.0, '{c, b, d}', 4)),
 		(5, ROW(5.0, '{NULL, e, NULL}', 5));
 
 	-- test_tbl_composite_with_enums_array_in_composite
 	INSERT INTO tst_comp_enum_what (a, b) VALUES
-		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
-		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]),
 		(ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]),
 		(ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]),
 		(ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]);
@@ -203,10 +339,10 @@ $node_publisher->safe_psql(
 	-- test_tbl_mixed_composites
 	INSERT INTO tst_comp_mix_array (a, b) VALUES
 		(ROW(
-			ROW(1,'a',1),
-			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
-			'a',
-			'{a,b,NULL,c}'),
+			ROW(2,'b',2),
+			ARRAY[ROW(2,'b',2)::tst_comp_basic_t, ROW(3,'c',3)::tst_comp_basic_t],
+			'b',
+			'{b,c,NULL,d}'),
 		ARRAY[
 			ROW(
 				ROW(1,'a',1),
@@ -223,36 +359,29 @@ $node_publisher->safe_psql(
 
 	-- test_tbl_range
 	INSERT INTO tst_range (a, b) VALUES
-		(1, '[1, 10]'),
-		(2, '[2, 20]'),
 		(3, '[3, 30]'),
 		(4, '[4, 40]'),
 		(5, '[5, 50]');
 
 	-- test_tbl_range_array
 	INSERT INTO tst_range_array (a, b, c) VALUES
-		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
-		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'),
 		(3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'),
 		(4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'),
 		(5, NULL, NULL);
 
 	-- tst_hstore
 	INSERT INTO tst_hstore (a, b) VALUES
-		(1, '"a"=>"1"'),
-		(2, '"zzz"=>"foo"'),
 		(3, '"123"=>"321"'),
 		(4, '"yellow horse"=>"moaned"');
 
 	-- tst_dom_constr
-	INSERT INTO tst_dom_constr VALUES (10);
+	INSERT INTO tst_dom_constr VALUES (11);
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-my $result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $initial_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -268,9 +397,13 @@ my $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $initial_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $initial_check);
 
-is( $result, '1|{1,2,3}
+my $initial_result = '1|{1,2,3}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,3,2}
@@ -321,6 +454,7 @@ e|{d,NULL}
 (4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"}
 (5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[1,11)
 2|[2,21)
 3|[3,31)
@@ -334,8 +468,10 @@ e|{d,NULL}
 1|"a"=>"1"
 2|"zzz"=>"foo"
 3|"123"=>"321"
-4|"yellow horse"=>"moaned"',
-	'check replicated inserts on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $initial_result, 'check replicated inserts on subscriber');
+is( $result_binary, $initial_result, 'check replicated inserts on subscriber in binary');
 
 # Run batch of updates
 $node_publisher->safe_psql(
@@ -370,10 +506,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $update_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -389,9 +524,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $update_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $update_check);
 
-is( $result, '1|{4,5,6}
+my $update_result = '1|{4,5,6}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,5,6,1}
@@ -442,6 +581,7 @@ e|{e,d}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[100,1001)
 2|[2,21)
 3|[3,31)
@@ -455,8 +595,10 @@ e|{e,d}
 1|"updated"=>"value"
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated updates on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $update_result, 'check replicated updates on subscriber');
+is( $result_binary, $update_result, 'check replicated updates on subscriber in binary');
 
 # Run batch of deletes
 $node_publisher->safe_psql(
@@ -490,10 +632,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $delete_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -509,9 +650,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $delete_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $delete_check);
 
-is( $result, '3|{3,2,1}
+my $delete_result = '3|{3,2,1}
 4|{4,5,6,1}
 5|{4,5,6,1}
 {3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
@@ -540,26 +685,56 @@ e|{e,d}
 (2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
 3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated deletes on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $delete_result, 'check replicated deletes on subscriber');
+is( $result_binary, $delete_result, 'check replicated deletes on subscriber in binary');
 
 # Test a domain with a constraint backed by a SQL-language function,
 # which needs an active snapshot in order to operate.
 $node_publisher->safe_psql('postgres',
-	"INSERT INTO tst_dom_constr VALUES (11)");
+	"INSERT INTO tst_dom_constr VALUES (12)");
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
+
+my $domain_check = qq(SELECT sum(a) FROM tst_dom_constr);
+
+$result = $node_subscriber->safe_psql('postgres', $domain_check);
+$result_binary = $node_subscriber->safe_psql('postgres', $domain_check);
 
-$result =
-  $node_subscriber->safe_psql('postgres',
-	"SELECT sum(a) FROM tst_dom_constr");
-is($result, '21', 'sql-function constraint on domain');
+is( $result, '33', 'sql-function constraint on domain');
+is( $result_binary, '33', 'sql-function constraint on domain');
+
+# Test whether subscriber fails in case of column type mismatch
+$ddl = qq(
+	CREATE TABLE tst_binary_mismatch (a int);
+	INSERT INTO tst_binary_mismatch VALUES (1);
+);
+$node_publisher->safe_psql('postgres', $ddl);
+
+$ddl = qq(
+	CREATE TABLE tst_binary_mismatch (a bigint);
+);
+$node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber_binary->safe_psql('postgres', $ddl);
+
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION');
+$node_subscriber_binary->safe_psql('postgres', 'ALTER SUBSCRIPTION tap_sub_binary REFRESH PUBLICATION');
+
+# Binary enabled subscription should fail
+$node_subscriber_binary->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? insufficient data left in message/);
+
+# Binary disabled subscription should succeed
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 $node_subscriber->stop('fast');
+$node_subscriber_binary->stop('fast');
 $node_publisher->stop('fast');
 
 done_testing();
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
index e53e23da3e..04ad6de08f 100644
--- a/src/test/subscription/t/014_binary.pl
+++ b/src/test/subscription/t/014_binary.pl
@@ -36,6 +36,16 @@ my $ddl = qq(
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
 
+# Insert some content and make sure it's synced to subscriber
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO public.test_arrays (a, b, c) VALUES
+		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}');
+
+	INSERT INTO public.test_numerical (a, b, c, d) VALUES
+		(1, 1.2, 1.3, 10);
+	));
+
 # Configure logical replication
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION tpub FOR ALL TABLES");
@@ -48,27 +58,35 @@ $node_subscriber->safe_psql('postgres',
 # Ensure nodes are in sync with each other
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
 
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
+
+is( $result, '1|1.2|1.3|10
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}', 'check syned data on subscriber');
+
 # Insert some content and make sure it's replicated across
 $node_publisher->safe_psql(
 	'postgres', qq(
 	INSERT INTO public.test_arrays (a, b, c) VALUES
-		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
 		('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
 
 	INSERT INTO public.test_numerical (a, b, c, d) VALUES
-		(1, 1.2, 1.3, 10),
 		(2, 2.2, 2.3, 20),
 		(3, 3.2, 3.3, 30);
 	));
 
 $node_publisher->wait_for_catchup('tsub');
 
-my $result = $node_subscriber->safe_psql('postgres',
-	"SELECT a, b, c, d FROM test_numerical ORDER BY a");
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
 
 is( $result, '1|1.2|1.3|10
 2|2.2|2.3|20
-3|3.2|3.3|30', 'check replicated data on subscriber');
+3|3.2|3.3|30
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check replicated data on subscriber');
 
 # Test updates as well
 $node_publisher->safe_psql(
-- 
2.25.1

