From 1a9797f82982aa6db9fe2001cd8ad4ff8ebd7c0c Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Wed, 28 Aug 2024 15:28:37 +0200
Subject: [PATCH v20250307 2/4] Allow tuplesort implementations to buffer
 writes

Before, all writes to the sort tapes would have to be completed during
the call to writetup().  That's sufficient when the user of tuplesort
isn't interested in merging sorted tuples, but btree (and in the future,
GIN) sorts tuples to later merge them during insertion into the index.
If it'd merge the tuples before writing them to disk, we can save
significant disk space and IO.

As such, we allow WRITETUP to do whatever it wants when we're filling a
tape with tuples, and call FLUSHWRITES() at the end to mark the end of
that tape so that the tuplesort can flush any remaining buffers to disk.

By design, this does _not_ allow deduplication while the dataset is still
in memory. Writing data to disk is inherently expensive, so we're likely
to win time by spending some additional cycles on buffering the data in
the hopes of not writing as much data. However, in memory the additional
cycles may cause too much of an overhead to be useful.

Note that any implementation of tuple merging using the buffering
strategy that is enabled by this commit must also make sure that the
merged tuples are definitely not larger than the sum of the sizes of the
merged tuples.
---
 src/include/utils/tuplesort.h              | 9 +++++++++
 src/backend/utils/sort/tuplesort.c         | 5 +++++
 src/backend/utils/sort/tuplesortvariants.c | 7 +++++++
 3 files changed, 21 insertions(+)

diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index be1bd8a1862..a89299296bb 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -195,6 +195,15 @@ typedef struct
 	void		(*writetup) (Tuplesortstate *state, LogicalTape *tape,
 							 SortTuple *stup);
 
+	/*
+	 * Flush any buffered writetup() writes.
+	 *
+	 * This is useful when writetup() buffers writes for more efficient
+	 * use of the tape's resources, e.g. when deduplicating or merging
+	 * sort tuples.
+	 */
+	void		(*flushwrites) (Tuplesortstate *state, LogicalTape *tape);
+
 	/*
 	 * Function to read a stored tuple from tape back into memory. 'len' is
 	 * the already-read length of the stored tuple.  The tuple is allocated
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 2ef32d53a43..7f346325678 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -395,6 +395,7 @@ struct Sharedsort
 #define REMOVEABBREV(state,stup,count)	((*(state)->base.removeabbrev) (state, stup, count))
 #define COMPARETUP(state,a,b)	((*(state)->base.comparetup) (a, b, state))
 #define WRITETUP(state,tape,stup)	((*(state)->base.writetup) (state, tape, stup))
+#define FLUSHWRITES(state,tape)	((state)->base.flushwrites ? (*(state)->base.flushwrites) (state, tape) : (void) 0)
 #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
 #define FREESTATE(state)	((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
 #define LACKMEM(state)		((state)->availMem < 0 && !(state)->slabAllocatorUsed)
@@ -2244,6 +2245,8 @@ mergeonerun(Tuplesortstate *state)
 		}
 	}
 
+	FLUSHWRITES(state, state->destTape);
+
 	/*
 	 * When the heap empties, we're done.  Write an end-of-run marker on the
 	 * output tape.
@@ -2369,6 +2372,8 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 		WRITETUP(state, state->destTape, stup);
 	}
 
+	FLUSHWRITES(state, state->destTape);
+
 	state->memtupcount = 0;
 
 	/*
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 0b83b8b25b3..79bd29aa90e 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -209,6 +209,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 	base->comparetup = comparetup_heap;
 	base->comparetup_tiebreak = comparetup_heap_tiebreak;
 	base->writetup = writetup_heap;
+	base->flushwrites = NULL;
 	base->readtup = readtup_heap;
 	base->haveDatum1 = true;
 	base->arg = tupDesc;		/* assume we need not copy tupDesc */
@@ -285,6 +286,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 	base->comparetup = comparetup_cluster;
 	base->comparetup_tiebreak = comparetup_cluster_tiebreak;
 	base->writetup = writetup_cluster;
+	base->flushwrites = NULL;
 	base->readtup = readtup_cluster;
 	base->freestate = freestate_cluster;
 	base->arg = arg;
@@ -393,6 +395,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	base->comparetup = comparetup_index_btree;
 	base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
 	base->writetup = writetup_index;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index;
 	base->haveDatum1 = true;
 	base->arg = arg;
@@ -472,6 +475,7 @@ tuplesort_begin_index_hash(Relation heapRel,
 	base->comparetup = comparetup_index_hash;
 	base->comparetup_tiebreak = comparetup_index_hash_tiebreak;
 	base->writetup = writetup_index;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index;
 	base->haveDatum1 = true;
 	base->arg = arg;
@@ -516,6 +520,7 @@ tuplesort_begin_index_gist(Relation heapRel,
 	base->comparetup = comparetup_index_btree;
 	base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
 	base->writetup = writetup_index;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index;
 	base->haveDatum1 = true;
 	base->arg = arg;
@@ -571,6 +576,7 @@ tuplesort_begin_index_brin(int workMem,
 	base->removeabbrev = removeabbrev_index_brin;
 	base->comparetup = comparetup_index_brin;
 	base->writetup = writetup_index_brin;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index_brin;
 	base->haveDatum1 = true;
 	base->arg = NULL;
@@ -683,6 +689,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 	base->comparetup = comparetup_datum;
 	base->comparetup_tiebreak = comparetup_datum_tiebreak;
 	base->writetup = writetup_datum;
+	base->flushwrites = NULL;
 	base->readtup = readtup_datum;
 	base->haveDatum1 = true;
 	base->arg = arg;
-- 
2.45.2

