Hi All,
We saw OOM in a system where WAL sender consumed Gigabttes of memory
which was never released. Upon investigation, we found out that there
were many ReorderBufferToastHash memory contexts linked to
ReorderBuffer context, together consuming gigs of memory. They were
running INSERT ... ON CONFLICT .. among other things. A similar report
at [1]

I could reproduce a memory leak in wal sender using following steps
Session 1
postgres=# create table t_toast (a int primary key, b text);
postgres=# CREATE PUBLICATION dbz_minimal_publication FOR TABLE public.t_toast;

Terminal 4
$ pg_recvlogical -d postgres --slot pgoutput_minimal_test_slot
--create-slot -P pgoutput
$ pg_recvlogical -d postgres --slot pgoutput_minimal_test_slot --start
-o proto_version=1 -o publication_names='dbz_minimal_publication' -f
/dev/null

Session 1
postgres=# select * from pg_replication_slots ;
         slot_name          |  plugin  | slot_type | datoid | database
| temporary | active | active_pid | xmin | catalog_xmin | restart_lsn
| confirmed_flush_lsn
----------------------------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
 pgoutput_minimal_test_slot | pgoutput | logical   |  12402 | postgres
| f         | f      |            |      |          570 | 0/15FFFD0
| 0/1600020

postgres=# begin;
postgres=# insert into t_toast values (500, repeat('something' ||
txid_current()::text, 100000)) ON CONFLICT (a) DO NOTHING;
INSERT 0 1

Session 2 (xid = 571)
postgres=# begin;
postgres=# insert into t_toast values (500, repeat('something' ||
txid_current()::text, 100000)) ON CONFLICT (a) DO NOTHING;

Session 3 (xid = 572)
postgres=# begin;
postgres=# insert into t_toast values (500, repeat('something' ||
txid_current()::text, 100000)) ON CONFLICT (a) DO NOTHING;

Session 1 (this session doesn't modify the table but is essential for
speculative insert to happen.)
postgres=# rollback;

Session 2 and 3 in the order in which you get control back (in my case
session 2 with xid = 571)
INSERT 0 1
postgres=# commit;
COMMIT

other session (in my case session 3 with xid = 572)
INSERT 0 0
postgres=# commit;
COMMIT

With the attached patch, we see following in the server logs
2021-03-25 09:57:20.469 IST [12424] LOG:  starting logical decoding
for slot "pgoutput_minimal_test_slot"
2021-03-25 09:57:20.469 IST [12424] DETAIL:  Streaming transactions
committing after 0/1600020, reading WAL from 0/15FFFD0.
2021-03-25 09:57:20.469 IST [12424] LOG:  logical decoding found
consistent point at 0/15FFFD0
2021-03-25 09:57:20.469 IST [12424] DETAIL:  There are no running transactions.
2021-03-25 09:59:45.494 IST [12424] LOG:  initializing hash table for
transaction 571
2021-03-25 09:59:45.494 IST [12424] LOG:  speculative insert
encountered in transaction 571
2021-03-25 09:59:45.494 IST [12424] LOG:  speculative insert confirmed
in transaction 571
2021-03-25 09:59:45.504 IST [12424] LOG:  destroying toast hash for
transaction 571
2021-03-25 09:59:50.806 IST [12424] LOG:  initializing hash table for
transaction 572
2021-03-25 09:59:50.806 IST [12424] LOG:  speculative insert
encountered in transaction 572
2021-03-25 09:59:50.806 IST [12424] LOG:  toast hash for transaction
572 is not cleared

Observe that the toast_hash was cleaned for the transaction 571 which
successfully inserted the row but was not cleaned for the transaction
572 which performed DO NOTHING instead of INSERT.

Here's the sequence of events which leads to memory leak in wal sender
1. Transaction 571 performs a speculative INSERT which is decoded as
toast insert followed by speculative insert of row
2. decoding toast tuple, causes the toast hash to be created
3. Speculative insert is ignored while decoding
4. Speculative insert is confimed and decoded as a normal INSERT, also
destroying the toast hash
5. Transaction 572 performs speculative insert which is decoded as
toast insert followed by speculative insert of row
6. decoding toast tuple causes the toast hash to be created
7. speculative insert is ignored while decoding
... Speculative INSERT is never confirmed and thus toast hash is never
destroyed leaking memory

In memory context dump we see as many ReorderBufferToastHash as the
number of times the above sequence is repeated.
TopMemoryContext: 1279640 total in 7 blocks; 23304 free (17 chunks);
1256336 used
...
    Replication command context: 32768 total in 3 blocks; 10952 free
(9 chunks); 21816 used
 ...
        ReorderBuffer: 8192 total in 1 blocks; 7656 free (7 chunks); 536 used
          ReorderBufferToastHash: 8192 total in 1 blocks; 2056 free (0
chunks); 6136 used
          ReorderBufferToastHash: 8192 total in 1 blocks; 2056 free (0
chunks); 6136 used
          ReorderBufferToastHash: 8192 total in 1 blocks; 2056 free (0
chunks); 6136 used


The relevant code is all in ReoderBufferCommit() in cases
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT and
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM.

About the solution: The speculative insert needs to be ignored since
it can be rolled back later. If speculative insert is not confirmed,
there is no way to know that the speculative insert change required a
toast_hash table and destroy it before the next change starts.
ReorderBufferCommit seems to notice a speculative insert that was
never confirmed in the following code
1624             change_done:
1625
1626                     /*
1627                      * Either speculative insertion was
confirmed, or it was
1628                      * unsuccessful and the record isn't needed anymore.
1629                      */
1630                     if (specinsert != NULL)
1631                     {
1632                         ReorderBufferReturnChange(rb, specinsert);
1633                         specinsert = NULL;
1634                     }
1635
1636                     if (relation != NULL)
1637                     {
1638                         RelationClose(relation);
1639                         relation = NULL;
1640                     }
1641                     break;

but by then we might have reused the toast_hash and thus can not be
destroyed. But that isn't the problem since the reused toast_hash will
be destroyed eventually.

It's only when the change next to speculative insert is something
other than INSERT/UPDATE/DELETE that we have to worry about a
speculative insert that was never confirmed. So may be for those
cases, we check whether specinsert != null and destroy toast_hash if
it exists.

[1] 
https://www.postgresql-archive.org/Diagnose-memory-leak-in-logical-replication-td6161318.html

-- 
Best Wishes,
Ashutosh Bapat
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index 318a281d7f..2fd01b53f5 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -42,6 +42,8 @@ heap_desc(StringInfo buf, XLogReaderState *record)
 	{
 		xl_heap_insert *xlrec = (xl_heap_insert *) rec;
 
+		if (xlrec->flags & XLH_INSERT_IS_SPECULATIVE)
+			appendStringInfo(buf, "speculative, ");
 		appendStringInfo(buf, "off %u", xlrec->offnum);
 	}
 	else if (info == XLOG_HEAP_DELETE)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1a4b87c419..cbc9ffbebb 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -351,6 +351,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->invalidations = NULL;
 	}
 
+	/* Toast hash should be deallocated by now */
+	if (txn->toast_hash)
+		elog(LOG, "toast hash for transaction %d is not cleared", txn->xid);
 	pfree(txn);
 }
 
@@ -1517,6 +1520,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 			{
 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 
+					elog(LOG, "speculative insert confirmed in transaction %d", xid);
 					/*
 					 * Confirmation for speculative insertion arrived. Simply
 					 * use as a normal record. It'll be cleaned up at the end
@@ -1638,6 +1642,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
 
+					elog(LOG, "speculative insert encountered in transaction %d", xid);
 					/*
 					 * Speculative insertions are dealt with by delaying the
 					 * processing of the insert until the confirmation record
@@ -2924,6 +2929,7 @@ ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	hash_ctl.hcxt = rb->context;
 	txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	elog(LOG, "initializing hash table for transaction %d", txn->xid);
 }
 
 /*
@@ -3206,6 +3212,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		}
 	}
 
+	elog(LOG, "destroying toast hash for transaction %d", txn->xid);
 	hash_destroy(txn->toast_hash);
 	txn->toast_hash = NULL;
 }
diff --git a/src/test/subscription/t/011_toast_subxacts.pl b/src/test/subscription/t/011_toast_subxacts.pl
new file mode 100644
index 0000000000..776f5b4794
--- /dev/null
+++ b/src/test/subscription/t/011_toast_subxacts.pl
@@ -0,0 +1,253 @@
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 14;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_mixed (a int primary key, b text, c numeric)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)");
+
+# Setup structure on subscriber
+# different column count and order than on publisher
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub ADD TABLE tab_mixed"
+);
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)");
+
+$node_publisher->wait_for_catchup($appname);
+
+my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed");
+is( $result, qq(local|1.1|foo|1
+local|2.2|bar|2), 'check replicated changes with different column order');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_mixed SET b = 'baz' WHERE a = 1");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab_mixed ORDER BY a");
+is( $result, qq(local|1.1|baz|1
+local|2.2|bar|2),
+	'update works with different column order and subscriber local values');
+
+# check behavior with toasted values
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, length(b), c, d FROM tab_mixed ORDER BY a");
+is( $result, qq(1|3|1.1|local
+2|500000|2.2|local),
+	'update transmits large column value');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, length(b), c, d FROM tab_mixed ORDER BY a");
+is( $result, qq(1|3|1.1|local
+2|500000|2.2|local),
+	'update transmits large column value');
+
+# It seems we do not free the hash tables used for storing toast values in
+# subtransactions leaking memory when they abort. So try adding a lot of toast
+# values
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (100, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (101, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" .
+	"RELEASE SAVEPOINT subxact1;" .
+	"COMMIT");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(4),
+	'toast subtxn test release savepoint, commit, replicated');
+
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (200, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (201, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" .
+	"RELEASE SAVEPOINT subxact1;" .
+	"ROLLBACK");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(4),
+	'toast subtxn test release savepoint, rollback');
+
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (300, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (301, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" .
+	"ROLLBACK TO SAVEPOINT subxact1;" .
+	"COMMIT");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(5),
+	'toast subtxn test rollback savepoint, commit, replicated');
+
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (400, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (401, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" .
+	"ROLLBACK TO SAVEPOINT subxact1;" .
+	"ROLLBACK");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(5),
+	'toast subtxn test rollback savepoint, commit, replicated');
+
+# The customer's load has INSERT ON CONFLICT DO UPDATE clause, so try that.
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (100, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (101, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"RELEASE SAVEPOINT subxact1;" .
+	"COMMIT");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(4),
+	'toast subtxn test release savepoint, commit, replicated');
+
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (200, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (201, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"RELEASE SAVEPOINT subxact1;" .
+	"ROLLBACK");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(4),
+	'toast subtxn test release savepoint, rollback');
+
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (300, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (301, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"ROLLBACK TO SAVEPOINT subxact1;" .
+	"COMMIT");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(5),
+	'toast subtxn test rollback savepoint, commit, replicated');
+
+$node_publisher->safe_psql('postgres',
+	"BEGIN;" .
+	"INSERT INTO tab_mixed VALUES (400, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"SAVEPOINT subxact1;" .
+	"INSERT INTO tab_mixed VALUES (401, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" .
+	"ROLLBACK TO SAVEPOINT subxact1;" .
+	"ROLLBACK");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_mixed");
+is( $result, qq(5),
+	'toast subtxn test rollback savepoint, commit, replicated');
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM tab_mixed WHERE a >= 100");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_mixed SET c = 3.3 WHERE a = 2");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, length(b), c, d FROM tab_mixed ORDER BY a");
+is( $result, qq(1|3|1.1|local
+2|500000|3.3|local),
+	'update with non-transmitted large column value');
+
+# check all the cleanup
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+	'check subscription relation status was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');

Reply via email to