From 10943afd0d57b4dbd97f1ebe896fa7459541c024 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Tue, 3 Mar 2020 09:40:10 +0530
Subject: [PATCH v2] Fastpath for sending changes to output plugin in logical 
 decoding

In logical decoding before sending the changes to the output plugin, if
there is only a one transaction then no need to build the binary heap
becasue for one transaction they are already in the LSN order.
---
 src/backend/replication/logical/reorderbuffer.c | 61 +++++++++++++++++--------
 1 file changed, 41 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 481277a..c6f1f89 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1037,11 +1037,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			nr_txns++;
 	}
 
-	/*
-	 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
-	 * need to allocate/build a heap then.
-	 */
-
 	/* allocate iteration state */
 	state = (ReorderBufferIterTXNState *)
 		MemoryContextAllocZero(rb->context,
@@ -1057,10 +1052,11 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		state->entries[off].segno = 0;
 	}
 
-	/* allocate heap */
-	state->heap = binaryheap_allocate(state->nr_txns,
-									  ReorderBufferIterCompare,
-									  state);
+	/* allocate heap, if we have more than one transaction. */
+	if (nr_txns > 1)
+		state->heap = binaryheap_allocate(state->nr_txns,
+										  ReorderBufferIterCompare,
+										  state);
 
 	/* Now that the state fields are initialized, it is safe to return it. */
 	*iter_state = state;
@@ -1092,7 +1088,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		state->entries[off].change = cur_change;
 		state->entries[off].txn = txn;
 
-		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+		/* add to heap, only if we have more than one transaction. */
+		if (nr_txns > 1)
+			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
 	}
 
 	/* add subtransactions if they contain changes */
@@ -1121,12 +1119,15 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			state->entries[off].change = cur_change;
 			state->entries[off].txn = cur_txn;
 
-			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+			/* add to heap, only if we have more than one transaction. */
+			if (nr_txns > 1)
+				binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
 		}
 	}
 
 	/* assemble a valid binary heap */
-	binaryheap_build(state->heap);
+	if (nr_txns > 1)
+		binaryheap_build(state->heap);
 }
 
 /*
@@ -1142,11 +1143,24 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 	ReorderBufferIterTXNEntry *entry;
 	int32		off;
 
-	/* nothing there anymore */
-	if (state->heap->bh_size == 0)
-		return NULL;
+	/*
+	 * If there is only one transaction then it will be at the offset 0.
+	 * Otherwise get the offset from the binary heap.
+	 */
+	if (state->nr_txns == 1)
+	{
+		off = 0;
+		if (state->entries[off].change == NULL)
+			return NULL;
+	}
+	else
+	{
+		/* nothing there anymore */
+		if (state->heap->bh_size == 0)
+			return NULL;
 
-	off = DatumGetInt32(binaryheap_first(state->heap));
+		off = DatumGetInt32(binaryheap_first(state->heap));
+	}
 	entry = &state->entries[off];
 
 	/* free memory we might have "leaked" in the previous *Next call */
@@ -1176,7 +1190,8 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		state->entries[off].lsn = next_change->lsn;
 		state->entries[off].change = next_change;
 
-		binaryheap_replace_first(state->heap, Int32GetDatum(off));
+		if (state->nr_txns > 1)
+			binaryheap_replace_first(state->heap, Int32GetDatum(off));
 		return change;
 	}
 
@@ -1206,14 +1221,19 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 			/* txn stays the same */
 			state->entries[off].lsn = next_change->lsn;
 			state->entries[off].change = next_change;
-			binaryheap_replace_first(state->heap, Int32GetDatum(off));
+
+			if (state->nr_txns > 1)
+				binaryheap_replace_first(state->heap, Int32GetDatum(off));
 
 			return change;
 		}
 	}
 
 	/* ok, no changes there anymore, remove */
-	binaryheap_remove_first(state->heap);
+	if (state->nr_txns > 1)
+		binaryheap_remove_first(state->heap);
+	else
+		entry->change = NULL;
 
 	return change;
 }
@@ -1244,7 +1264,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 		Assert(dlist_is_empty(&state->old_change));
 	}
 
-	binaryheap_free(state->heap);
+	if (state->nr_txns > 1)
+		binaryheap_free(state->heap);
 	pfree(state);
 }
 
-- 
1.8.3.1

