From 3ac8ba2025f5db2b63ab94b0b51789e14ae9ce46 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Thu, 10 Jun 2021 18:29:31 +0530
Subject: [PATCH v6] Fix decoding of speculative aborts.

During decoding for speculative inserts, we were relying for cleaning
toast hash on confirmation records or next change records. But that
could lead to multiple problems (a) memory leak if there is neither a
confirmation record nor any other record after toast insertion for a
speculative insert in the transaction, (b) error and assertion failures
if the next operation is not an insert/update on the same table.

The fix is to start queuing spec abort change and clean up toast hash
and change record during its processing. Currently, we are queuing the
spec aborts for both toast and main table even though we perform cleanup
while processing the main table's spec abort record. Later, if we have a
way to distinguish between the spec abort record of toast and the main
table, we can avoid queuing the change for spec aborts of toast tables.

Reported-by: Ashutosh Bapat
Author: Dilip Kumar
Reviewed-by: Amit Kapila
Backpatch-through: 9.6, where it was introduced
Discussion: https://postgr.es/m/CAExHW5sPKF-Oovx_qZe4p5oM6Dvof7_P+XgsNAViug15Fm99jA@mail.gmail.com
---
 contrib/test_decoding/Makefile                     |   2 +-
 .../test_decoding/expected/speculative_abort.out   |  85 ++++++++++++++++
 contrib/test_decoding/specs/speculative_abort.spec | 111 +++++++++++++++++++++
 src/backend/replication/logical/decode.c           |  14 ++-
 src/backend/replication/logical/reorderbuffer.c    |  49 ++++++---
 src/include/replication/reorderbuffer.h            |   9 +-
 6 files changed, 245 insertions(+), 25 deletions(-)
 create mode 100644 contrib/test_decoding/expected/speculative_abort.out
 create mode 100644 contrib/test_decoding/specs/speculative_abort.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b..1cab935 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-	twophase_snapshot
+	twophase_snapshot speculative_abort
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/speculative_abort.out b/contrib/test_decoding/expected/speculative_abort.out
new file mode 100644
index 0000000..7492506
--- /dev/null
+++ b/contrib/test_decoding/expected/speculative_abort.out
@@ -0,0 +1,85 @@
+Parsed test spec with 3 sessions
+
+starting permutation: controller_locks controller_show_count s1_begin s1_insert_toast s2_insert_toast controller_show_count controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show_count controller_unlock_2_2 controller_show_count controller_unlock_1_2 s1_insert_other s1_commit controller_get_changes
+data           
+
+step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);
+pg_advisory_locksess           lock           
+
+               1              1              
+               1              2              
+               1              3              
+               2              1              
+               2              2              
+               2              3              
+step controller_show_count: SELECT COUNT(*) FROM tbl1;
+count          
+
+0              
+step s1_begin: BEGIN;
+s1: NOTICE:  blurt_and_lock() called for 1 in session 1
+s1: NOTICE:  acquiring advisory lock on 3
+step s1_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...>
+s2: NOTICE:  blurt_and_lock() called for 1 in session 2
+s2: NOTICE:  acquiring advisory lock on 3
+step s2_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...>
+step controller_show_count: SELECT COUNT(*) FROM tbl1;
+count          
+
+0              
+step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3);
+pg_advisory_unlock
+
+t              
+s1: NOTICE:  blurt_and_lock() called for 1 in session 1
+s1: NOTICE:  acquiring advisory lock on 2
+step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3);
+pg_advisory_unlock
+
+t              
+s2: NOTICE:  blurt_and_lock() called for 1 in session 2
+s2: NOTICE:  acquiring advisory lock on 2
+step controller_show_count: SELECT COUNT(*) FROM tbl1;
+count          
+
+0              
+step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2);
+pg_advisory_unlock
+
+t              
+step s2_insert_toast: <... completed>
+step controller_show_count: SELECT COUNT(*) FROM tbl1;
+count          
+
+1              
+step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2);
+pg_advisory_unlock
+
+t              
+s1: NOTICE:  blurt_and_lock() called for 1 in session 1
+s1: NOTICE:  acquiring advisory lock on 2
+s1: NOTICE:  blurt_and_lock() called for 1 in session 1
+s1: NOTICE:  acquiring advisory lock on 2
+step s1_insert_toast: <... completed>
+step s1_insert_other: INSERT INTO tbl2 VALUES(1);
+step s1_commit: COMMIT;
+step controller_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.tbl1: INSERT: a[integer]:1 b[text]:'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+COMMIT         
+BEGIN          
+table public.tbl2: INSERT: a[integer]:1
+COMMIT         
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/specs/speculative_abort.spec b/contrib/test_decoding/specs/speculative_abort.spec
new file mode 100644
index 0000000..6b3cbfb
--- /dev/null
+++ b/contrib/test_decoding/specs/speculative_abort.spec
@@ -0,0 +1,111 @@
+# INSERT ... ON CONFLICT test verifying that speculative abort for toast
+# insertions are handled during logical decoding.
+#
+# Does this by using advisory locks controlling the progress of
+# insertions. By waiting when building the index keys, it's possible
+# to schedule concurrent INSERT ON CONFLICTs so that there will always
+# be a speculative conflict.
+
+setup
+{
+	SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+	DROP TABLE IF EXISTS tbl1;
+	CREATE TABLE tbl1 (a INT, b TEXT);
+	ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL;
+	CREATE TABLE tbl2 (a INT);
+
+    CREATE OR REPLACE FUNCTION blurt_and_lock(int) RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
+    BEGIN
+        RAISE NOTICE 'blurt_and_lock() called for % in session %', $1, current_setting('spec.session')::int;
+
+	-- depending on lock state, wait for lock 2 or 3
+        IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN
+            RAISE NOTICE 'acquiring advisory lock on 2';
+            PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2);
+        ELSE
+            RAISE NOTICE 'acquiring advisory lock on 3';
+            PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3);
+        END IF;
+    RETURN $1;
+    END;$$;
+
+	CREATE UNIQUE INDEX idx on tbl1(blurt_and_lock(a));
+
+	-- consume DDL
+	SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "controller"
+setup
+{
+    SET default_transaction_isolation = 'read committed';
+    SET application_name = 'isolation/insert-specconflict-controller';
+}
+step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);}
+step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); }
+step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); }
+step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); }
+step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); }
+step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); }
+step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); }
+step "controller_show_count" { SELECT COUNT(*) FROM tbl1; }
+step "controller_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+setup
+{
+	SET synchronous_commit=on;
+    SET default_transaction_isolation = 'read committed';
+    SET spec.session = 1;
+    SET application_name = 'isolation/insert-specconflict-s1';
+}
+
+step "s1_begin"  { BEGIN; }
+step "s1_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; }
+step "s1_insert_other" { INSERT INTO tbl2 VALUES(1); }
+step "s1_commit"  { COMMIT; }
+
+session "s2"
+setup
+{
+	SET synchronous_commit=on;
+    SET default_transaction_isolation = 'read committed';
+    SET spec.session = 2;
+    SET application_name = 'isolation/insert-specconflict-s2';
+}
+step "s2_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; }
+
+
+# Test logical decoding of speculative aborts for toast insertion followed by
+# insertion into a different table which doesn't have a toast.
+permutation
+   # acquire a number of locks, to control execution flow - the
+   # blurt_and_lock function acquires advisory locks that allow us to
+   # continue after a) the optimistic conflict probe b) after the
+   # insertion of the speculative tuple.
+   "controller_locks"
+   "controller_show_count"
+   "s1_begin"
+   "s1_insert_toast" "s2_insert_toast"
+   "controller_show_count"
+   # Switch both sessions to wait on the other lock next time (the speculative insertion)
+   "controller_unlock_1_1" "controller_unlock_2_1"
+   # Allow both sessions to continue
+   "controller_unlock_1_3" "controller_unlock_2_3"
+   "controller_show_count"
+   # Allow the second session to finish insertion
+   "controller_unlock_2_2"
+   # This should now show a successful insertion
+   "controller_show_count"
+   # Allow the first session to speculative abort
+   "controller_unlock_1_2"
+   # Insert into other table from s1 and commit
+   "s1_insert_other" "s1_commit"
+   # Get the changes
+   "controller_get_changes"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7067016..453efc5 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1040,19 +1040,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
-	/*
-	 * Super deletions are irrelevant for logical decoding, it's driven by the
-	 * confirmation records.
-	 */
-	if (xlrec->flags & XLH_DELETE_IS_SUPER)
-		return;
-
 	/* output plugin doesn't look for this origin, no need to queue */
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
-	change->action = REORDER_BUFFER_CHANGE_DELETE;
+
+	if (xlrec->flags & XLH_DELETE_IS_SUPER)
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_DELETE;
+
 	change->origin_id = XLogRecGetOrigin(r);
 
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d9e127..d905f32 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -443,6 +443,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->invalidations = NULL;
 	}
 
+	/* Reset the toast hash */
+	ReorderBufferToastReset(rb, txn);
+
 	pfree(txn);
 }
 
@@ -520,6 +523,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
@@ -2211,8 +2215,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			change_done:
 
 					/*
-					 * Either speculative insertion was confirmed, or it was
-					 * unsuccessful and the record isn't needed anymore.
+					 * If speculative insertion was confirmed, the record isn't
+					 * needed anymore.
 					 */
 					if (specinsert != NULL)
 					{
@@ -2254,6 +2258,32 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					specinsert = change;
 					break;
 
+				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
+
+					/*
+					 * Abort for speculative insertion arrived. So cleanup the
+					 * specinsert tuple and toast hash.
+					 *
+					 * Note that we get the spec abort change for each toast
+					 * entry but we need to perform the cleanup only the first
+					 * time we get it for the main table.
+					 */
+					if (specinsert != NULL)
+					{
+						/*
+						 * We must clean the toast hash before processing a
+						 * completely new tuple to avoid confusion about the
+						 * previous tuple's toast chunks.
+						 */
+						Assert(change->data.tp.clear_toast_afterwards);
+						ReorderBufferToastReset(rb, txn);
+
+						/* We don't need this record anymore. */
+						ReorderBufferReturnChange(rb, specinsert, true);
+						specinsert = NULL;
+					}
+					break;
+
 				case REORDER_BUFFER_CHANGE_TRUNCATE:
 					{
 						int			i;
@@ -2360,16 +2390,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			}
 		}
 
-		/*
-		 * There's a speculative insertion remaining, just clean in up, it
-		 * can't have been successful, otherwise we'd gotten a confirmation
-		 * record.
-		 */
-		if (specinsert)
-		{
-			ReorderBufferReturnChange(rb, specinsert, true);
-			specinsert = NULL;
-		}
+		/* speculative insertion record must be freed by now */
+		Assert(!specinsert);
 
 		/* clean up the iterator */
 		ReorderBufferIterTXNFinish(rb, iterstate);
@@ -3754,6 +3776,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
@@ -4017,6 +4040,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
@@ -4315,6 +4339,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0c6e9d1..ba257d8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -46,10 +46,10 @@ typedef struct ReorderBufferTupleBuf
  * changes. Users of the decoding facilities will never see changes with
  * *_INTERNAL_* actions.
  *
- * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern
- * "speculative insertions", and their confirmation respectively.  They're
- * used by INSERT .. ON CONFLICT .. UPDATE.  Users of logical decoding don't
- * have to care about these.
+ * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
+ * changes concern "speculative insertions", their confirmation, and abort
+ * respectively.  They're used by INSERT .. ON CONFLICT .. UPDATE.  Users of
+ * logical decoding don't have to care about these.
  */
 enum ReorderBufferChangeType
 {
@@ -63,6 +63,7 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
 	REORDER_BUFFER_CHANGE_TRUNCATE
 };
 
-- 
1.8.3.1

