From 0bba087f29019e0ca0cc6f8996db4f21533b0767 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 8 Aug 2022 14:14:44 +0300
Subject: [PATCH] Allow logical replication to copy table in binary

If binary option is enabled in a subscription, copy tables in binary
format during table synchronization.

This commit modifies tests 014_binary.pl and 002_types.pl to
test initial table synchronization step in binary format
---
 doc/src/sgml/ref/create_subscription.sgml   |   8 +-
 src/backend/replication/logical/tablesync.c |  11 +-
 src/test/subscription/t/002_types.pl        | 263 ++++++++++++++++----
 src/test/subscription/t/014_binary.pl       |  28 ++-
 4 files changed, 248 insertions(+), 62 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 7390c715bc..9a576a2b95 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -183,14 +183,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         <term><literal>binary</literal> (<type>boolean</type>)</term>
         <listitem>
          <para>
-          Specifies whether the subscription will request the publisher to
-          send the data in binary format (as opposed to text).
+          Specifies whether the subscription will copy the initial data to
+          synchronize relations in binary format and also request the publisher
+          to send the data in binary format (as opposed to text).
           The default is <literal>false</literal>.
           Even when this option is enabled, only data types having
           binary send and receive functions will be transferred in binary.
          </para>
 
          <para>
+          Since logical replication uses <command>COPY</command> command,
+          it inherits restrictions of <command>COPY</command> command for
+          binary format.(See <xref linkend="sql-copy"/>.) 
           When doing cross-version replication, it could be that the
           publisher has a binary send function for some data type, but the
           subscriber lacks a binary receive function for that type.  In
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6a01ffd273..a3b128f6fb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -101,6 +101,7 @@
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
@@ -1032,6 +1033,7 @@ copy_table(Relation rel)
 	CopyFromState cstate;
 	List	   *attnamelist;
 	ParseState *pstate;
+	List 	   *options = NIL;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1110,6 +1112,13 @@ copy_table(Relation rel)
 
 		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
+
+	if (MySubscription->binary)
+	{
+		appendStringInfoString(&cmd, "  WITH (FORMAT binary)");
+		options = lappend(options, makeDefElem("format", (Node *) makeString("binary"), -1));
+	}
+
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -1126,7 +1135,7 @@ copy_table(Relation rel)
 										 NULL, false, false);
 
 	attnamelist = make_copy_attnamelist(relmapentry);
-	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 
 	/* Do the copy */
 	(void) CopyFrom(cstate);
diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl
index d6c6f49327..4386033962 100644
--- a/src/test/subscription/t/002_types.pl
+++ b/src/test/subscription/t/002_types.pl
@@ -19,6 +19,11 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
+# Create binary subscriber node
+my $node_subscriber_binary = PostgreSQL::Test::Cluster->new('subscriber_binary');
+$node_subscriber_binary->init(allows_streaming => 'logical');
+$node_subscriber_binary->start;
+
 # Create some preexisting content on publisher
 my $ddl = qq(
 	CREATE EXTENSION hstore WITH SCHEMA public;
@@ -104,6 +109,100 @@ my $ddl = qq(
 # Setup structure on both nodes
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber_binary->safe_psql('postgres', $ddl);
+
+# Insert initial test data
+$node_publisher->safe_psql(
+	'postgres', qq(
+	-- test_tbl_one_array_col
+	INSERT INTO tst_one_array (a, b) VALUES
+		(1, '{1, 2, 3}'),
+		(2, '{2, 3, 1}');
+
+	-- test_tbl_arrays
+	INSERT INTO tst_arrays (a, b, c, d) VALUES
+		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
+		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}');
+		
+	-- test_tbl_single_enum
+	INSERT INTO tst_one_enum (a, b) VALUES
+		(1, 'a'),
+		(2, 'b');
+
+	-- test_tbl_enums
+	INSERT INTO tst_enums (a, b) VALUES
+		('a', '{b, c}'),
+		('b', '{c, a}');
+
+	-- test_tbl_single_composites
+	INSERT INTO tst_one_comp (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composites
+	INSERT INTO tst_comps (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]);
+
+	-- test_tbl_composite_with_enums
+	INSERT INTO tst_comp_enum (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+
+	-- test_tbl_composite_with_enums_array
+	INSERT INTO tst_comp_enum_array (a, b) VALUES
+		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
+		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]);
+
+	-- test_tbl_composite_with_single_enums_array_in_composite
+	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
+		(1, ROW(1.0, '{a, b, c}', 1)),
+		(2, ROW(2.0, '{a, b, c}', 2));
+
+	-- test_tbl_composite_with_enums_array_in_composite
+	INSERT INTO tst_comp_enum_what (a, b) VALUES
+		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
+		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]);
+
+	-- test_tbl_mixed_composites
+	INSERT INTO tst_comp_mix_array (a, b) VALUES
+		(ROW(
+			ROW(1,'a',1),
+			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
+			'a',
+			'{a,b,NULL,c}'),
+		ARRAY[
+			ROW(
+				ROW(1,'a',1),
+				ARRAY[
+					ROW(1,'a',1)::tst_comp_basic_t,
+					ROW(2,'b',2)::tst_comp_basic_t,
+					NULL
+					],
+				'a',
+				'{a,b,c}'
+				)::tst_comp_mix_t
+			]
+		);
+
+	-- test_tbl_range
+	INSERT INTO tst_range (a, b) VALUES
+		(1, '[1, 10]'),
+		(2, '[2, 20]');
+
+	-- test_tbl_range_array
+	INSERT INTO tst_range_array (a, b, c) VALUES
+		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
+		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}');
+
+	-- tst_hstore
+	INSERT INTO tst_hstore (a, b) VALUES
+		(1, '"a"=>"1"'),
+		(2, '"zzz"=>"foo"');
+
+	-- tst_dom_constr
+	INSERT INTO tst_dom_constr VALUES (10);
+));
 
 # Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
@@ -113,89 +212,126 @@ $node_publisher->safe_psql('postgres',
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
 );
+$node_subscriber_binary->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_binary CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_binary_slot, binary = true)"
+);
 
 # Wait for initial sync to finish as well
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+$node_subscriber_binary->wait_for_subscription_sync($node_publisher, 'tap_sub_binary');
+
+my $sync_check =  qq(
+	SET timezone = '+2';
+	SELECT a, b FROM tst_one_array ORDER BY a;
+	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+	SELECT a, b FROM tst_one_enum ORDER BY a;
+	SELECT a, b FROM tst_enums ORDER BY a;
+	SELECT a, b FROM tst_one_comp ORDER BY a;
+	SELECT a, b FROM tst_comps ORDER BY a;
+	SELECT a, b FROM tst_comp_enum ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+	SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+	SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+	SELECT a, b FROM tst_range ORDER BY a;
+	SELECT a, b, c FROM tst_range_array ORDER BY a;
+	SELECT a, b FROM tst_hstore ORDER BY a;
+);
+
+# Check the synced data on subscribers
+my $result = $node_subscriber->safe_psql('postgres', $sync_check);
+my $result_binary = $node_subscriber_binary->safe_psql('postgres', $sync_check);
+
+my $sync_result = '1|{1,2,3}
+2|{2,3,1}
+{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+1|a
+2|b
+a|{b,c}
+b|{c,a}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,a,1)
+2|(2,b,2)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+1|(1,"{a,b,c}",1)
+2|(2,"{a,b,c}",2)
+(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+1|[1,11)
+2|[2,21)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+1|"a"=>"1"
+2|"zzz"=>"foo"';
+
+is( $result, $sync_result, 'check initial sync on subscriber');
+is( $result_binary, $sync_result, 'check initial sync on subscriber in binary');
 
 # Insert initial test data
 $node_publisher->safe_psql(
 	'postgres', qq(
 	-- test_tbl_one_array_col
 	INSERT INTO tst_one_array (a, b) VALUES
-		(1, '{1, 2, 3}'),
-		(2, '{2, 3, 1}'),
 		(3, '{3, 2, 1}'),
 		(4, '{4, 3, 2}'),
 		(5, '{5, NULL, 3}');
 
 	-- test_tbl_arrays
 	INSERT INTO tst_arrays (a, b, c, d) VALUES
-		('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
-		('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'),
 		('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'),
 		('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'),
 		('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}');
 
 	-- test_tbl_single_enum
 	INSERT INTO tst_one_enum (a, b) VALUES
-		(1, 'a'),
-		(2, 'b'),
 		(3, 'c'),
 		(4, 'd'),
 		(5, NULL);
 
 	-- test_tbl_enums
 	INSERT INTO tst_enums (a, b) VALUES
-		('a', '{b, c}'),
-		('b', '{c, a}'),
 		('c', '{b, a}'),
 		('d', '{c, b}'),
 		('e', '{d, NULL}');
 
 	-- test_tbl_single_composites
 	INSERT INTO tst_one_comp (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, NULL, 5));
 
 	-- test_tbl_composites
 	INSERT INTO tst_comps (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]),
 		(ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]),
 		(ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]);
 
 	-- test_tbl_composite_with_enums
 	INSERT INTO tst_comp_enum (a, b) VALUES
-		(1, ROW(1.0, 'a', 1)),
-		(2, ROW(2.0, 'b', 2)),
 		(3, ROW(3.0, 'c', 3)),
 		(4, ROW(4.0, 'd', 4)),
 		(5, ROW(NULL, 'e', NULL));
 
 	-- test_tbl_composite_with_enums_array
 	INSERT INTO tst_comp_enum_array (a, b) VALUES
-		(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
-		(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]),
 		(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]),
 		(ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]),
 		(ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]);
 
 	-- test_tbl_composite_with_single_enums_array_in_composite
 	INSERT INTO tst_comp_one_enum_array (a, b) VALUES
-		(1, ROW(1.0, '{a, b, c}', 1)),
-		(2, ROW(2.0, '{a, b, c}', 2)),
 		(3, ROW(3.0, '{a, b, c}', 3)),
 		(4, ROW(4.0, '{c, b, d}', 4)),
 		(5, ROW(5.0, '{NULL, e, NULL}', 5));
 
 	-- test_tbl_composite_with_enums_array_in_composite
 	INSERT INTO tst_comp_enum_what (a, b) VALUES
-		(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
-		(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]),
 		(ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]),
 		(ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]),
 		(ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]);
@@ -203,10 +339,10 @@ $node_publisher->safe_psql(
 	-- test_tbl_mixed_composites
 	INSERT INTO tst_comp_mix_array (a, b) VALUES
 		(ROW(
-			ROW(1,'a',1),
-			ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
-			'a',
-			'{a,b,NULL,c}'),
+			ROW(2,'b',2),
+			ARRAY[ROW(2,'b',2)::tst_comp_basic_t, ROW(3,'c',3)::tst_comp_basic_t],
+			'b',
+			'{b,c,NULL,d}'),
 		ARRAY[
 			ROW(
 				ROW(1,'a',1),
@@ -223,36 +359,29 @@ $node_publisher->safe_psql(
 
 	-- test_tbl_range
 	INSERT INTO tst_range (a, b) VALUES
-		(1, '[1, 10]'),
-		(2, '[2, 20]'),
 		(3, '[3, 30]'),
 		(4, '[4, 40]'),
 		(5, '[5, 50]');
 
 	-- test_tbl_range_array
 	INSERT INTO tst_range_array (a, b, c) VALUES
-		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
-		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'),
 		(3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'),
 		(4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'),
 		(5, NULL, NULL);
 
 	-- tst_hstore
 	INSERT INTO tst_hstore (a, b) VALUES
-		(1, '"a"=>"1"'),
-		(2, '"zzz"=>"foo"'),
 		(3, '"123"=>"321"'),
 		(4, '"yellow horse"=>"moaned"');
 
 	-- tst_dom_constr
-	INSERT INTO tst_dom_constr VALUES (10);
+	INSERT INTO tst_dom_constr VALUES (11);
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-my $result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $initial_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -268,9 +397,13 @@ my $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
+
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $initial_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $initial_check);
 
-is( $result, '1|{1,2,3}
+my $initial_result = '1|{1,2,3}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,3,2}
@@ -321,6 +454,7 @@ e|{d,NULL}
 (4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"}
 (5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[1,11)
 2|[2,21)
 3|[3,31)
@@ -334,8 +468,10 @@ e|{d,NULL}
 1|"a"=>"1"
 2|"zzz"=>"foo"
 3|"123"=>"321"
-4|"yellow horse"=>"moaned"',
-	'check replicated inserts on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $initial_result, 'check replicated inserts on subscriber');
+is( $result_binary, $initial_result, 'check replicated inserts on subscriber in binary');
 
 # Run batch of updates
 $node_publisher->safe_psql(
@@ -370,10 +506,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $update_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -389,9 +524,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+); 
 
-is( $result, '1|{4,5,6}
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $update_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $update_check);
+
+my $update_result = '1|{4,5,6}
 2|{2,3,1}
 3|{3,2,1}
 4|{4,5,6,1}
@@ -442,6 +581,7 @@ e|{e,d}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
 ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 1|[100,1001)
 2|[2,21)
 3|[3,31)
@@ -455,8 +595,10 @@ e|{e,d}
 1|"updated"=>"value"
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated updates on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $update_result, 'check replicated updates on subscriber');
+is( $result_binary, $update_result, 'check replicated updates on subscriber in binary');
 
 # Run batch of deletes
 $node_publisher->safe_psql(
@@ -490,10 +632,9 @@ $node_publisher->safe_psql(
 ));
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
-# Check the data on subscriber
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
+my $delete_check =  qq(
 	SET timezone = '+2';
 	SELECT a, b FROM tst_one_array ORDER BY a;
 	SELECT a, b, c, d FROM tst_arrays ORDER BY a;
@@ -509,9 +650,13 @@ $result = $node_subscriber->safe_psql(
 	SELECT a, b FROM tst_range ORDER BY a;
 	SELECT a, b, c FROM tst_range_array ORDER BY a;
 	SELECT a, b FROM tst_hstore ORDER BY a;
-));
+);
 
-is( $result, '3|{3,2,1}
+# Check the data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $delete_check);
+$result_binary = $node_subscriber_binary->safe_psql('postgres', $delete_check);
+
+my $delete_result = '3|{3,2,1}
 4|{4,5,6,1}
 5|{4,5,6,1}
 {3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
@@ -540,26 +685,36 @@ e|{e,d}
 (2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
 (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
 (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
 2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
 3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
 2|"updated"=>"value"
 3|"also"=>"updated"
-4|"yellow horse"=>"moaned"',
-	'check replicated deletes on subscriber');
+4|"yellow horse"=>"moaned"';
+
+is( $result, $delete_result, 'check replicated deletes on subscriber');
+is( $result_binary, $delete_result, 'check replicated deletes on subscriber in binary');
 
 # Test a domain with a constraint backed by a SQL-language function,
 # which needs an active snapshot in order to operate.
 $node_publisher->safe_psql('postgres',
-	"INSERT INTO tst_dom_constr VALUES (11)");
+	"INSERT INTO tst_dom_constr VALUES (12)");
 
 $node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_binary');
 
 $result =
   $node_subscriber->safe_psql('postgres',
 	"SELECT sum(a) FROM tst_dom_constr");
-is($result, '21', 'sql-function constraint on domain');
+is($result, '33', 'sql-function constraint on domain');
+
+$result_binary =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT sum(a) FROM tst_dom_constr");
+is($result_binary, '33', 'sql-function constraint on domain');
 
 $node_subscriber->stop('fast');
+$node_subscriber_binary->stop('fast');
 $node_publisher->stop('fast');
 
 done_testing();
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
index 8d8b35721f..395efee1c7 100644
--- a/src/test/subscription/t/014_binary.pl
+++ b/src/test/subscription/t/014_binary.pl
@@ -36,6 +36,16 @@ my $ddl = qq(
 $node_publisher->safe_psql('postgres', $ddl);
 $node_subscriber->safe_psql('postgres', $ddl);
 
+# Insert some content and make sure it's synced to subscriber
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO public.test_arrays (a, b, c) VALUES
+		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}');
+
+	INSERT INTO public.test_numerical (a, b, c, d) VALUES
+		(1, 1.2, 1.3, 10);
+	));
+
 # Configure logical replication
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION tpub FOR ALL TABLES");
@@ -48,27 +58,35 @@ $node_subscriber->safe_psql('postgres',
 # Ensure nodes are in sync with each other
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
 
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
+
+is( $result, '1|1.2|1.3|10
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}', 'check syned data on subscriber');
+
 # Insert some content and make sure it's replicated across
 $node_publisher->safe_psql(
 	'postgres', qq(
 	INSERT INTO public.test_arrays (a, b, c) VALUES
-		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
 		('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
 
 	INSERT INTO public.test_numerical (a, b, c, d) VALUES
-		(1, 1.2, 1.3, 10),
 		(2, 2.2, 2.3, 20),
 		(3, 3.2, 3.3, 30);
 	));
 
 $node_publisher->wait_for_catchup('tsub');
 
-my $result = $node_subscriber->safe_psql('postgres',
-	"SELECT a, b, c, d FROM test_numerical ORDER BY a");
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;");
 
 is( $result, '1|1.2|1.3|10
 2|2.2|2.3|20
-3|3.2|3.3|30', 'check replicated data on subscriber');
+3|3.2|3.3|30
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check replicated data on subscriber');
 
 # Test updates as well
 $node_publisher->safe_psql(
-- 
2.25.1

