From 36e75baa88766f575ad5caa05cb4628023a8f164 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Mon, 25 Nov 2019 09:00:51 +0530
Subject: [PATCH] Fastpath for sending changes to output plugin in logical
 decoding

In logical decoding before sending the changes to the output plugin, if
there is only a one transaction then no need to build the binary heap
becasue for one transaction they are already in the LSN order.
---
 contrib/test_decoding/logical.conf              |  2 +-
 src/backend/replication/logical/reorderbuffer.c | 61 +++++++++++++++++--------
 2 files changed, 42 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/logical.conf b/contrib/test_decoding/logical.conf
index 07c4d3d..02595d9 100644
--- a/contrib/test_decoding/logical.conf
+++ b/contrib/test_decoding/logical.conf
@@ -1,3 +1,3 @@
 wal_level = logical
 max_replication_slots = 4
-logical_decoding_work_mem = 64kB
+logical_decoding_work_mem = 64MB
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 710a22f..ce92068 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1113,11 +1113,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 			nr_txns++;
 	}
 
-	/*
-	 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
-	 * need to allocate/build a heap then.
-	 */
-
 	/* allocate iteration state */
 	state = (ReorderBufferIterTXNState *)
 		MemoryContextAllocZero(rb->context,
@@ -1133,10 +1128,11 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		state->entries[off].segno = 0;
 	}
 
-	/* allocate heap */
-	state->heap = binaryheap_allocate(state->nr_txns,
-									  ReorderBufferIterCompare,
-									  state);
+	/* allocate heap, if we have more than one transaction. */
+	if (nr_txns > 1)
+		state->heap = binaryheap_allocate(state->nr_txns,
+										  ReorderBufferIterCompare,
+										  state);
 
 	/*
 	 * Now insert items into the binary heap, in an unordered fashion.  (We
@@ -1165,7 +1161,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		state->entries[off].change = cur_change;
 		state->entries[off].txn = txn;
 
-		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+		/* add to heap, only if we have more than one transaction. */
+		if (nr_txns > 1)
+			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
 	}
 
 	/* add subtransactions if they contain changes */
@@ -1194,12 +1192,15 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 			state->entries[off].change = cur_change;
 			state->entries[off].txn = cur_txn;
 
-			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+			/* add to heap, only if we have more than one transaction. */
+			if (nr_txns > 1)
+				binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
 		}
 	}
 
 	/* assemble a valid binary heap */
-	binaryheap_build(state->heap);
+	if (nr_txns > 1)
+		binaryheap_build(state->heap);
 
 	return state;
 }
@@ -1217,11 +1218,24 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 	ReorderBufferIterTXNEntry *entry;
 	int32		off;
 
-	/* nothing there anymore */
-	if (state->heap->bh_size == 0)
-		return NULL;
+	/*
+	 * If there is only one transaction then it will be at the offset 0.
+	 * Otherwise get the offset from the binary heap.
+	 */
+	if (state->nr_txns == 1)
+	{
+		off = 0;
+		if (state->entries[off].change == NULL)
+			return NULL;
+	}
+	else
+	{
+		/* nothing there anymore */
+		if (state->heap->bh_size == 0)
+			return NULL;
 
-	off = DatumGetInt32(binaryheap_first(state->heap));
+		off = DatumGetInt32(binaryheap_first(state->heap));
+	}
 	entry = &state->entries[off];
 
 	/* free memory we might have "leaked" in the previous *Next call */
@@ -1251,7 +1265,8 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		state->entries[off].lsn = next_change->lsn;
 		state->entries[off].change = next_change;
 
-		binaryheap_replace_first(state->heap, Int32GetDatum(off));
+		if (state->nr_txns > 1)
+			binaryheap_replace_first(state->heap, Int32GetDatum(off));
 		return change;
 	}
 
@@ -1281,14 +1296,19 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 			/* txn stays the same */
 			state->entries[off].lsn = next_change->lsn;
 			state->entries[off].change = next_change;
-			binaryheap_replace_first(state->heap, Int32GetDatum(off));
+
+			if (state->nr_txns > 1)
+				binaryheap_replace_first(state->heap, Int32GetDatum(off));
 
 			return change;
 		}
 	}
 
 	/* ok, no changes there anymore, remove */
-	binaryheap_remove_first(state->heap);
+	if (state->nr_txns > 1)
+		binaryheap_remove_first(state->heap);
+	else
+		entry->change = NULL;
 
 	return change;
 }
@@ -1319,7 +1339,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 		Assert(dlist_is_empty(&state->old_change));
 	}
 
-	binaryheap_free(state->heap);
+	if (state->nr_txns > 1)
+		binaryheap_free(state->heap);
 	pfree(state);
 }
 
-- 
1.8.3.1

