From dc4b220e3182629f7f720e2cc202ba6f0a1b0b7f Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 8 Aug 2022 14:14:44 +0300
Subject: [PATCH] Allow logical replication to copy table in binary

If binary option is enabled in a subscription, copy tables in binary
format during table synchronization.
---
 doc/src/sgml/logical-replication.sgml       |   9 +-
 doc/src/sgml/ref/create_subscription.sgml   |   5 +-
 src/backend/replication/logical/tablesync.c |  11 +-
 src/test/subscription/t/002_types.pl        | 287 ++++++++++++++++----
 src/test/subscription/t/014_binary.pl       |  28 +-
 5 files changed, 273 insertions(+), 67 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index f8756389a3..7ca3cb1ff0 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -241,10 +241,11 @@
    types of the columns do not need to match, as long as the text
    representation of the data can be converted to the target type.  For
    example, you can replicate from a column of type <type>integer</type> to a
-   column of type <type>bigint</type>.  The target table can also have
-   additional columns not provided by the published table.  Any such columns
-   will be filled with the default value as specified in the definition of the
-   target table.
+   column of type <type>bigint</type>.  Replication in binary format is type
+   specific and does not allow to replicate data between different types according
+   to its restrictions.  The target table can also have additional columns not provided
+   by the published table.  Any such columns will be filled with the default
+   value as specified in the definition of the target table.
   </para>
 
   <sect2 id="logical-replication-subscription-slot">
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index f9a1776380..b006eab4c2 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -189,8 +189,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         <term><literal>binary</literal> (<type>boolean</type>)</term>
         <listitem>
          <para>
-          Specifies whether the subscription will request the publisher to
-          send the data in binary format (as opposed to text).
+          Specifies whether the subscription will copy the initial data to
+          synchronize relations in binary format and also request the publisher
+          to send the data in binary format too (as opposed to text).
           The default is <literal>false</literal>.
           Even when this option is enabled, only data types having
           binary send and receive functions will be transferred in binary.
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..7ba877312d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -101,6 +101,7 @@
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
@@ -1066,6 +1067,7 @@ copy_table(Relation rel)
 	CopyFromState cstate;
 	List	   *attnamelist;
 	ParseState *pstate;
+	List 	   *options = NIL;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1144,6 +1146,13 @@ copy_table(Relation rel)
 
 		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
+
+	if (MySubscription->binary)
+	{
+		appendStringInfoString(&cmd, "  WITH (FORMAT binary)");
+		options = lappend(options, makeDefElem("format", (Node *) makeString("binary"), -1));
+	}
+
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -1160,7 +1169,7 @@ copy_table(Relation rel)
 										 NULL, false, false);
 
 	attnamelist = make_copy_attnamelist(relmapentry);
-	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 
 	/* Do the copy */
 	(void) CopyFrom(cstate);
diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl
index d6c6f49327..8c6be447d4 100644
--- a/src/test/subscription/t/002_types.pl
+++ b/src/test/subscription/t/002_types.pl
@@ -19,6 +19,11 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
+# Create binary subscriber node
+my $node_subscriber_binary = PostgreSQL::Test::Cluster->new('subscriber_binary');
+$node_subscriber_binary->init(allows_streaming => 'logical');
+$node_subscriber_binary->start;
+
 # Create some preexisting content on publisher
 my $ddl = qq(
 	CREATE EXTENSION hstore WITH SCHEMA public;
@@ -104,6 +109,100 @@ my $ddl = qq(
 # Setup structure on both nodes
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber_binary->safe_psql('postgres', $ddl);
+
+# Insert initial test data
+$node_publisher->safe_psql(
+	'postgres', qq(
+	-- test_tbl_one_array_col
+	INSERT INTO tst_one_array (a, b) VALUES
+		(1, '{1, 2, 3}'),
+		(2, '{2, 3, 1}');
+
+	-- test_tbl_arrays
+	INSERT INTO tst_arrays (a, b, c, d) VALUES
+		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
+		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}');
+
+	-- test_tbl_single_enum
+	INSERT INTO tst_one_enum (a, b) VALUES
+		(1, 'a'),
+		(2, 'b');
+
+	-- test_tbl_enums
+	INSERT INTO tst_enums (a, b) VALUES
+		('a', '{b, c}'),
+		('b', '{c, a}');
+
+	-- test_tbl_single_composites
+	INSERT INTO tst_one_comp (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composites
+	INSERT INTO tst_comps (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]);
+
+	-- test_tbl_composite_with_enums
+	INSERT INTO tst_comp_enum (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composite_with_enums_array
+	INSERT INTO tst_comp_enum_array (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]);
+
+	-- test_tbl_composite_with_single_enums_array_in_composite
+	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
+		(1, ROW(1.0, '{a, b, c}', 1)),
+		(2, ROW(2.0, '{a, b, c}', 2));
+
+	-- test_tbl_composite_with_enums_array_in_composite
+	INSERT INTO tst_comp_enum_what (a, b) VALUES
+		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
+		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]);
+
+	-- test_tbl_mixed_composites
+	INSERT INTO tst_comp_mix_array (a, b) VALUES
+		(ROW(
+			ROW(1,'a',1),
+			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
+			'a',
+			'{a,b,NULL,c}'),
+		ARRAY[
+			ROW(
+				ROW(1,'a',1),
+				ARRAY[
+					ROW(1,'a',1)::tst_comp_basic_t,
+					ROW(2,'b',2)::tst_comp_basic_t,
+					NULL
+					],
+				'a',
+				'{a,b,c}'
+				)::tst_comp_mix_t
+			]
+		);
+
+	-- test_tbl_range
+	INSERT INTO tst_range (a, b) VALUES
+		(1, '[1, 10]'),
+		(2, '[2, 20]');
+
+	-- test_tbl_range_array
+	INSERT INTO tst_range_array (a, b, c) VALUES
+		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
+		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}');
+
+	-- tst_hstore
+	INSERT INTO tst_hstore (a, b) VALUES
+		(1, '"a"=>"1"'),
+		(2, '"zzz"=>"foo"');
+
+	-- tst_dom_constr
+	INSERT INTO tst_dom_constr VALUES (10);
+));
 
 # Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
@@ -113,89 +212,126 @@ $node_publisher->safe_psql('postgres',
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
 );
+$node_subscriber_binary->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_binary CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_binary_slot, binary = true)"
+);
 
 # Wait for initial sync to finish as well
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+$node_subscriber_binary->wait_for_subscription_sync($node_publisher, 'tap_sub_binary');
 
-# Insert initial test data
+my $sync_check =  qq(
+	SET timezone = '+2';
+	SELECT a, b FROM tst_one_array ORDER BY a;
+	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+	SELECT a, b FROM tst_one_enum ORDER BY a;
+	SELECT a, b FROM tst_enums ORDER BY a;
+	SELECT a, b FROM tst_one_comp ORDER BY a;
+	SELECT a, b FROM tst_comps ORDER BY a;
+	SELECT a, b FROM tst_comp_enum ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+	SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+	SELECT a, b FROM tst_range ORDER BY a;
+	SELECT a, b, c FROM tst_range_array ORDER BY a;
+	SELECT a, b FROM tst_hstore ORDER BY a;
+);
+
+# Check the synced data on subscribers
+my $result = $node_subscriber->safe_psql('postgres', $sync_check);
+my $result_binary = $node_subscriber_binary->safe_psql('postgres', $sync_check);
+
+my $sync_result = '1|{1,2,3}
+2|{2,3,1}
+{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+1|a
+2|b
+a|{b,c}
+b|{c,a}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,"{a,b,c}",1)
+2|(2,"{a,b,c}",2)
+(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+1|[1,11)
+2|[2,21)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+1|"a"=>"1"
+2|"zzz"=>"foo"';
+
+is( $result, $sync_result, 'check initial sync on subscriber');
+is( $result_binary, $sync_result, 'check initial sync on subscriber in binary');
+
+# Insert test data for update to apply changes
 $node_publisher->safe_psql(
 	'postgres', qq(
 	-- test_tbl_one_array_col
 	INSERT INTO tst_one_array (a, b) VALUES
-		(1, '{1, 2, 3}'),
-		(2, '{2, 3, 1}'),
 		(3, '{3, 2, 1}'),
 		(4, '{4, 3, 2}'),
 		(5, '{5, NULL, 3}');
 
 	-- test_tbl_arrays
 	INSERT INTO tst_arrays (a, b, c, d) VALUES
-		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
-		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'),
 		('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'),
 		('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'),
 		('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}');
 
 	-- test_tbl_single_enum
 	INSERT INTO tst_one_enum (a, b) VALUES
-		(1, 'a'),
-		(2, 'b'),
 		(3, 'c'),
 		(4, 'd'),
 		(5, NULL);
 
 	-- test_tbl_enums
 	INSERT INTO tst_enums (a, b) VALUES
-		('a', '{b, c}'),
-		('b', '{c, a}'),
 		('c', '{b, a}'),
 		('d', '{c, b}'),
 		('e', '{d, NULL}');
 
 	-- test_tbl_single_composites
 	INSERT INTO tst_one_comp (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, NULL, 5));
 
 	-- test_tbl_composites
 	INSERT INTO tst_comps (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]),
 		(ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]),
 		(ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]);
 
 	-- test_tbl_composite_with_enums
 	INSERT INTO tst_comp_enum (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, 'e', NULL));
 
 	-- test_tbl_composite_with_enums_array
 	INSERT INTO tst_comp_enum_array (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]),
 		(ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]),
 		(ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]);
 
 	-- test_tbl_composite_with_single_enums_array_in_composite
 	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
-		(1, ROW(1.0, '{a, b, c}', 1)),
-		(2, ROW(2.0, '{a, b, c}', 2)),
 		(3, ROW(3.0, '{a, b, c}', 3)),
 		(4, ROW(4.0, '{c, b, d}', 4)),
 		(5, ROW(5.0, '{NULL, e, NULL}', 5));
 
 	-- test_tbl_composite_with_enums_array_in_composite
 	INSERT INTO tst_comp_enum_what (a, b) VALUES
-		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
-		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]),
 		(ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]),
 		(ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]),
 		(ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]);
@@ -203,10 +339,10 @@ $node_publisher->safe_psql(
 	-- test_tbl_mixed_composites
 	INSERT INTO tst_comp_mix_array (a, b) VALUES
 		(ROW(
-			ROW(1,'a',1),
-			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
-			'a',
-			'{a,b,NULL,c}'),
+			ROW(2,'b',2),
+			ARRAY[ROW(2,'b',2)::tst_comp_basic_t, ROW(3,'c',3)::tst_comp_basic_t],
+			'b',
+			'{b,c,NULL,d}'),
 		ARRAY[
 			ROW(
 				ROW(1,'a',1),
@@ -223,36 +359,29 @@ $node_publisher->safe_psql(
 
 	-- test_tbl_range
 	INSERT INTO tst_range (a, b) VALUES
-		(1, '[1, 10]'),
-		(2, '[2, 20]'),
 		(3, '[3, 30]'),
 		(4, '[4, 40]'),
 		(5, '[5, 50]');
 
 	-- test_tbl_range_array
 	INSERT INTO tst_range_array (a, b, c) VALUES
-		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
-		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'),
 		(3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'),
 		(4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'),
 		(5, NULL, NULL);
 
 	-- tst_hstore
 	INSERT INTO tst_hstore (a, b) VALUES
-		(1, '"a"=>"1"'),
-		(2, '"zzz"=>"foo"'),
 		(3, '"123"=>"321"'),
 		(4, '"yellow horse"=>"moaned"');
 
 	-- tst_dom_constr
-	INSERT INTO tst_dom_constr VALUES (10);
+	INSERT INTO tst_dom_constr VALUES (11);
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-my $result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $initial_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -268,9 +397,13 @@ my $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $initial_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $initial_check);
 
-is( $result, '1|{1,2,3}
+my $initial_result = '1|{1,2,3}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,3,2}
@@ -321,6 +454,7 @@ e|{d,NULL}
 (4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"}
 (5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[1,11)
 2|[2,21)
 3|[3,31)
@@ -334,8 +468,10 @@ e|{d,NULL}
 1|"a"=>"1"
 2|"zzz"=>"foo"
 3|"123"=>"321"
-4|"yellow horse"=>"moaned"',
-	'check replicated inserts on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $initial_result, 'check replicated inserts on subscriber');
+is( $result_binary, $initial_result, 'check replicated inserts on subscriber in binary');
 
 # Run batch of updates
 $node_publisher->safe_psql(
@@ -370,10 +506,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $update_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -389,9 +524,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $update_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $update_check);
 
-is( $result, '1|{4,5,6}
+my $update_result = '1|{4,5,6}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,5,6,1}
@@ -442,6 +581,7 @@ e|{e,d}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[100,1001)
 2|[2,21)
 3|[3,31)
@@ -455,8 +595,10 @@ e|{e,d}
 1|"updated"=>"value"
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated updates on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $update_result, 'check replicated updates on subscriber');
+is( $result_binary, $update_result, 'check replicated updates on subscriber in binary');
 
 # Run batch of deletes
 $node_publisher->safe_psql(
@@ -490,10 +632,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $delete_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -509,9 +650,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
 
-is( $result, '3|{3,2,1}
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $delete_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $delete_check);
+
+my $delete_result = '3|{3,2,1}
 4|{4,5,6,1}
 5|{4,5,6,1}
 {3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
@@ -540,26 +685,58 @@ e|{e,d}
 (2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
 3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated deletes on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $delete_result, 'check replicated deletes on subscriber');
+is( $result_binary, $delete_result, 'check replicated deletes on subscriber in binary');
 
 # Test a domain with a constraint backed by a SQL-language function,
 # which needs an active snapshot in order to operate.
 $node_publisher->safe_psql('postgres',
-	"INSERT INTO tst_dom_constr VALUES (11)");
+	"INSERT INTO tst_dom_constr VALUES (12)");
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
 $result =
   $node_subscriber->safe_psql('postgres',
 	"SELECT sum(a) FROM tst_dom_constr");
-is($result, '21', 'sql-function constraint on domain');
+is($result, '33', 'sql-function constraint on domain');
+
+$result_binary =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT sum(a) FROM tst_dom_constr");
+is($result_binary, '33', 'sql-function constraint on domain');
+
+# Test whether subscriber fails in case of column type mismatch
+$ddl = qq(
+	CREATE TABLE tst_binary_mismatch (a int);
+	INSERT INTO tst_binary_mismatch VALUES (1);
+);
+$node_publisher->safe_psql('postgres', $ddl);
+
+$ddl = qq(
+	CREATE TABLE tst_binary_mismatch (a bigint);
+);
+$node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber_binary->safe_psql('postgres', $ddl);
+
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION');
+$node_subscriber_binary->safe_psql('postgres', 'ALTER SUBSCRIPTION tap_sub_binary REFRESH PUBLICATION');
+
+# Binary enabled subscription should fail
+$node_subscriber_binary->wait_for_log("ERROR:  insufficient data left in message");
+
+# Binary disabled subscription should succeed
+$node_publisher->wait_for_catchup('tap_sub');
 
 $node_subscriber->stop('fast');
+$node_subscriber_binary->stop('fast');
 $node_publisher->stop('fast');
 
 done_testing();
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
index 8d8b35721f..395efee1c7 100644
--- a/src/test/subscription/t/014_binary.pl
+++ b/src/test/subscription/t/014_binary.pl
@@ -36,6 +36,16 @@ my $ddl = qq(
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
 
+# Insert some content and make sure it's synced to subscriber
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO public.test_arrays (a, b, c) VALUES
+		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}');
+
+	INSERT INTO public.test_numerical (a, b, c, d) VALUES
+		(1, 1.2, 1.3, 10);
+	));
+
 # Configure logical replication
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION tpub FOR ALL TABLES");
@@ -48,27 +58,35 @@ $node_subscriber->safe_psql('postgres',
 # Ensure nodes are in sync with each other
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
 
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
+
+is( $result, '1|1.2|1.3|10
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}', 'check syned data on subscriber');
+
 # Insert some content and make sure it's replicated across
 $node_publisher->safe_psql(
 	'postgres', qq(
 	INSERT INTO public.test_arrays (a, b, c) VALUES
-		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
 		('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
 
 	INSERT INTO public.test_numerical (a, b, c, d) VALUES
-		(1, 1.2, 1.3, 10),
 		(2, 2.2, 2.3, 20),
 		(3, 3.2, 3.3, 30);
 	));
 
 $node_publisher->wait_for_catchup('tsub');
 
-my $result = $node_subscriber->safe_psql('postgres',
-	"SELECT a, b, c, d FROM test_numerical ORDER BY a");
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
 
 is( $result, '1|1.2|1.3|10
 2|2.2|2.3|20
-3|3.2|3.3|30', 'check replicated data on subscriber');
+3|3.2|3.3|30
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check replicated data on subscriber');
 
 # Test updates as well
 $node_publisher->safe_psql(
-- 
2.25.1

