Hi All, We saw OOM in a system where WAL sender consumed Gigabttes of memory which was never released. Upon investigation, we found out that there were many ReorderBufferToastHash memory contexts linked to ReorderBuffer context, together consuming gigs of memory. They were running INSERT ... ON CONFLICT .. among other things. A similar report at [1]
I could reproduce a memory leak in wal sender using following steps Session 1 postgres=# create table t_toast (a int primary key, b text); postgres=# CREATE PUBLICATION dbz_minimal_publication FOR TABLE public.t_toast; Terminal 4 $ pg_recvlogical -d postgres --slot pgoutput_minimal_test_slot --create-slot -P pgoutput $ pg_recvlogical -d postgres --slot pgoutput_minimal_test_slot --start -o proto_version=1 -o publication_names='dbz_minimal_publication' -f /dev/null Session 1 postgres=# select * from pg_replication_slots ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ----------------------------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- pgoutput_minimal_test_slot | pgoutput | logical | 12402 | postgres | f | f | | | 570 | 0/15FFFD0 | 0/1600020 postgres=# begin; postgres=# insert into t_toast values (500, repeat('something' || txid_current()::text, 100000)) ON CONFLICT (a) DO NOTHING; INSERT 0 1 Session 2 (xid = 571) postgres=# begin; postgres=# insert into t_toast values (500, repeat('something' || txid_current()::text, 100000)) ON CONFLICT (a) DO NOTHING; Session 3 (xid = 572) postgres=# begin; postgres=# insert into t_toast values (500, repeat('something' || txid_current()::text, 100000)) ON CONFLICT (a) DO NOTHING; Session 1 (this session doesn't modify the table but is essential for speculative insert to happen.) postgres=# rollback; Session 2 and 3 in the order in which you get control back (in my case session 2 with xid = 571) INSERT 0 1 postgres=# commit; COMMIT other session (in my case session 3 with xid = 572) INSERT 0 0 postgres=# commit; COMMIT With the attached patch, we see following in the server logs 2021-03-25 09:57:20.469 IST [12424] LOG: starting logical decoding for slot "pgoutput_minimal_test_slot" 2021-03-25 09:57:20.469 IST [12424] DETAIL: Streaming transactions committing after 0/1600020, reading WAL from 0/15FFFD0. 2021-03-25 09:57:20.469 IST [12424] LOG: logical decoding found consistent point at 0/15FFFD0 2021-03-25 09:57:20.469 IST [12424] DETAIL: There are no running transactions. 2021-03-25 09:59:45.494 IST [12424] LOG: initializing hash table for transaction 571 2021-03-25 09:59:45.494 IST [12424] LOG: speculative insert encountered in transaction 571 2021-03-25 09:59:45.494 IST [12424] LOG: speculative insert confirmed in transaction 571 2021-03-25 09:59:45.504 IST [12424] LOG: destroying toast hash for transaction 571 2021-03-25 09:59:50.806 IST [12424] LOG: initializing hash table for transaction 572 2021-03-25 09:59:50.806 IST [12424] LOG: speculative insert encountered in transaction 572 2021-03-25 09:59:50.806 IST [12424] LOG: toast hash for transaction 572 is not cleared Observe that the toast_hash was cleaned for the transaction 571 which successfully inserted the row but was not cleaned for the transaction 572 which performed DO NOTHING instead of INSERT. Here's the sequence of events which leads to memory leak in wal sender 1. Transaction 571 performs a speculative INSERT which is decoded as toast insert followed by speculative insert of row 2. decoding toast tuple, causes the toast hash to be created 3. Speculative insert is ignored while decoding 4. Speculative insert is confimed and decoded as a normal INSERT, also destroying the toast hash 5. Transaction 572 performs speculative insert which is decoded as toast insert followed by speculative insert of row 6. decoding toast tuple causes the toast hash to be created 7. speculative insert is ignored while decoding ... Speculative INSERT is never confirmed and thus toast hash is never destroyed leaking memory In memory context dump we see as many ReorderBufferToastHash as the number of times the above sequence is repeated. TopMemoryContext: 1279640 total in 7 blocks; 23304 free (17 chunks); 1256336 used ... Replication command context: 32768 total in 3 blocks; 10952 free (9 chunks); 21816 used ... ReorderBuffer: 8192 total in 1 blocks; 7656 free (7 chunks); 536 used ReorderBufferToastHash: 8192 total in 1 blocks; 2056 free (0 chunks); 6136 used ReorderBufferToastHash: 8192 total in 1 blocks; 2056 free (0 chunks); 6136 used ReorderBufferToastHash: 8192 total in 1 blocks; 2056 free (0 chunks); 6136 used The relevant code is all in ReoderBufferCommit() in cases REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT and REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM. About the solution: The speculative insert needs to be ignored since it can be rolled back later. If speculative insert is not confirmed, there is no way to know that the speculative insert change required a toast_hash table and destroy it before the next change starts. ReorderBufferCommit seems to notice a speculative insert that was never confirmed in the following code 1624 change_done: 1625 1626 /* 1627 * Either speculative insertion was confirmed, or it was 1628 * unsuccessful and the record isn't needed anymore. 1629 */ 1630 if (specinsert != NULL) 1631 { 1632 ReorderBufferReturnChange(rb, specinsert); 1633 specinsert = NULL; 1634 } 1635 1636 if (relation != NULL) 1637 { 1638 RelationClose(relation); 1639 relation = NULL; 1640 } 1641 break; but by then we might have reused the toast_hash and thus can not be destroyed. But that isn't the problem since the reused toast_hash will be destroyed eventually. It's only when the change next to speculative insert is something other than INSERT/UPDATE/DELETE that we have to worry about a speculative insert that was never confirmed. So may be for those cases, we check whether specinsert != null and destroy toast_hash if it exists. [1] https://www.postgresql-archive.org/Diagnose-memory-leak-in-logical-replication-td6161318.html -- Best Wishes, Ashutosh Bapat
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c index 318a281d7f..2fd01b53f5 100644 --- a/src/backend/access/rmgrdesc/heapdesc.c +++ b/src/backend/access/rmgrdesc/heapdesc.c @@ -42,6 +42,8 @@ heap_desc(StringInfo buf, XLogReaderState *record) { xl_heap_insert *xlrec = (xl_heap_insert *) rec; + if (xlrec->flags & XLH_INSERT_IS_SPECULATIVE) + appendStringInfo(buf, "speculative, "); appendStringInfo(buf, "off %u", xlrec->offnum); } else if (info == XLOG_HEAP_DELETE) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1a4b87c419..cbc9ffbebb 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -351,6 +351,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Toast hash should be deallocated by now */ + if (txn->toast_hash) + elog(LOG, "toast hash for transaction %d is not cleared", txn->xid); pfree(txn); } @@ -1517,6 +1520,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + elog(LOG, "speculative insert confirmed in transaction %d", xid); /* * Confirmation for speculative insertion arrived. Simply * use as a normal record. It'll be cleaned up at the end @@ -1638,6 +1642,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + elog(LOG, "speculative insert encountered in transaction %d", xid); /* * Speculative insertions are dealt with by delaying the * processing of the insert until the confirmation record @@ -2924,6 +2929,7 @@ ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn) hash_ctl.hcxt = rb->context; txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + elog(LOG, "initializing hash table for transaction %d", txn->xid); } /* @@ -3206,6 +3212,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) } } + elog(LOG, "destroying toast hash for transaction %d", txn->xid); hash_destroy(txn->toast_hash); txn->toast_hash = NULL; } diff --git a/src/test/subscription/t/011_toast_subxacts.pl b/src/test/subscription/t/011_toast_subxacts.pl new file mode 100644 index 0000000000..776f5b4794 --- /dev/null +++ b/src/test/subscription/t/011_toast_subxacts.pl @@ -0,0 +1,253 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 14; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)"); + +# Setup structure on subscriber +# different column count and order than on publisher +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_mixed" +); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)"); + +$node_publisher->wait_for_catchup($appname); + +my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed"); +is( $result, qq(local|1.1|foo|1 +local|2.2|bar|2), 'check replicated changes with different column order'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = 'baz' WHERE a = 1"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(local|1.1|baz|1 +local|2.2|bar|2), + 'update works with different column order and subscriber local values'); + +# check behavior with toasted values + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a"); +is( $result, qq(1|3|1.1|local +2|500000|2.2|local), + 'update transmits large column value'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a"); +is( $result, qq(1|3|1.1|local +2|500000|2.2|local), + 'update transmits large column value'); + +# It seems we do not free the hash tables used for storing toast values in +# subtransactions leaking memory when they abort. So try adding a lot of toast +# values +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (100, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (101, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" . + "RELEASE SAVEPOINT subxact1;" . + "COMMIT"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(4), + 'toast subtxn test release savepoint, commit, replicated'); + +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (200, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (201, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" . + "RELEASE SAVEPOINT subxact1;" . + "ROLLBACK"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(4), + 'toast subtxn test release savepoint, rollback'); + +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (300, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (301, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" . + "ROLLBACK TO SAVEPOINT subxact1;" . + "COMMIT"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(5), + 'toast subtxn test rollback savepoint, commit, replicated'); + +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (400, repeat('before_savept1' || txid_current()::text, 100000), 2.2);" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (401, repeat('after_savept1'|| txid_current()::text, 100000), 2.2);" . + "ROLLBACK TO SAVEPOINT subxact1;" . + "ROLLBACK"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(5), + 'toast subtxn test rollback savepoint, commit, replicated'); + +# The customer's load has INSERT ON CONFLICT DO UPDATE clause, so try that. +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (100, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (101, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "RELEASE SAVEPOINT subxact1;" . + "COMMIT"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(4), + 'toast subtxn test release savepoint, commit, replicated'); + +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (200, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (201, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "RELEASE SAVEPOINT subxact1;" . + "ROLLBACK"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(4), + 'toast subtxn test release savepoint, rollback'); + +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (300, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (301, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "ROLLBACK TO SAVEPOINT subxact1;" . + "COMMIT"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(5), + 'toast subtxn test rollback savepoint, commit, replicated'); + +$node_publisher->safe_psql('postgres', + "BEGIN;" . + "INSERT INTO tab_mixed VALUES (400, repeat('ins_upd_before_savept1' || txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "SAVEPOINT subxact1;" . + "INSERT INTO tab_mixed VALUES (401, repeat('ins_upd_after_savept1'|| txid_current()::text, 100000), 2.2) ON CONFLICT(a) DO UPDATE SET b = excluded.b;" . + "ROLLBACK TO SAVEPOINT subxact1;" . + "ROLLBACK"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_mixed"); +is( $result, qq(5), + 'toast subtxn test rollback savepoint, commit, replicated'); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_mixed WHERE a >= 100"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 3.3 WHERE a = 2"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a"); +is( $result, qq(1|3|1.1|local +2|500000|3.3|local), + 'update with non-transmitted large column value'); + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');