From 9c035e6e1621216e4f5db50954e9a093ee70e019 Mon Sep 17 00:00:00 2001
From: David Rowley <dgrowley@gmail.com>
Date: Sun, 7 Jul 2024 22:19:18 +1200
Subject: [PATCH v4 2/3] Optimize WindowAgg's use of tuplestores

When WindowAgg finished one partition of a PARTITION BY, it previously
would call tuplestore_end() to purge all the stored tuples before again
calling tuplestore_begin_heap() and carefully setting up all of the
tuplestore read pointers exactly as required for the given frameOptions.
Since the frameOptions don't change between partitions, this part does
not make much sense.

It seems much better to create the tuplestore and the read pointers once
and simply call tuplestore_clear() at the end of each partition.
tuplestore_clear() moves all of the read pointers back to the start
position and removes all tuples.

A simple test query with 1 million partitions and 1 tuple per partition
has been shown to run around 44% faster than without this change.
---
 src/backend/executor/nodeWindowAgg.c | 162 ++++++++++++++++++---------
 src/include/nodes/execnodes.h        |   1 +
 2 files changed, 107 insertions(+), 56 deletions(-)

diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 88a85f556b..51a6708a39 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1074,57 +1074,24 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
 }
 
 /*
- * begin_partition
- * Start buffering rows of the next partition.
+ * prepare_tuplestore
+ *		Prepare the tuplestore and all of the required read pointers for the
+ *		WindowAggState's frameOptions.
+ *
+ * Note: We use pg_noinline to avoid bloating the calling function with code
+ * which is only called once.
  */
-static void
-begin_partition(WindowAggState *winstate)
+static pg_noinline void
+prepare_tuplestore(WindowAggState *winstate)
 {
 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
-	PlanState  *outerPlan = outerPlanState(winstate);
 	int			frameOptions = winstate->frameOptions;
 	int			numfuncs = winstate->numfuncs;
-	int			i;
-
-	winstate->partition_spooled = false;
-	winstate->framehead_valid = false;
-	winstate->frametail_valid = false;
-	winstate->grouptail_valid = false;
-	winstate->spooled_rows = 0;
-	winstate->currentpos = 0;
-	winstate->frameheadpos = 0;
-	winstate->frametailpos = 0;
-	winstate->currentgroup = 0;
-	winstate->frameheadgroup = 0;
-	winstate->frametailgroup = 0;
-	winstate->groupheadpos = 0;
-	winstate->grouptailpos = -1;	/* see update_grouptailpos */
-	ExecClearTuple(winstate->agg_row_slot);
-	if (winstate->framehead_slot)
-		ExecClearTuple(winstate->framehead_slot);
-	if (winstate->frametail_slot)
-		ExecClearTuple(winstate->frametail_slot);
-
-	/*
-	 * If this is the very first partition, we need to fetch the first input
-	 * row to store in first_part_slot.
-	 */
-	if (TupIsNull(winstate->first_part_slot))
-	{
-		TupleTableSlot *outerslot = ExecProcNode(outerPlan);
 
-		if (!TupIsNull(outerslot))
-			ExecCopySlot(winstate->first_part_slot, outerslot);
-		else
-		{
-			/* outer plan is empty, so we have nothing to do */
-			winstate->partition_spooled = true;
-			winstate->more_partitions = false;
-			return;
-		}
-	}
+	/* we shouldn't be called if this was done already */
+	Assert(winstate->buffer == NULL);
 
-	/* Create new tuplestore for this partition */
+	/* Create new tuplestore */
 	winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
 
 	/*
@@ -1158,16 +1125,10 @@ begin_partition(WindowAggState *winstate)
 
 		agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
 															readptr_flags);
-		agg_winobj->markpos = -1;
-		agg_winobj->seekpos = -1;
-
-		/* Also reset the row counters for aggregates */
-		winstate->aggregatedbase = 0;
-		winstate->aggregatedupto = 0;
 	}
 
 	/* create mark and read pointers for each real window function */
-	for (i = 0; i < numfuncs; i++)
+	for (int i = 0; i < numfuncs; i++)
 	{
 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
 
@@ -1179,8 +1140,6 @@ begin_partition(WindowAggState *winstate)
 															0);
 			winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
 															EXEC_FLAG_BACKWARD);
-			winobj->markpos = -1;
-			winobj->seekpos = -1;
 		}
 	}
 
@@ -1224,6 +1183,88 @@ begin_partition(WindowAggState *winstate)
 		winstate->grouptail_ptr =
 			tuplestore_alloc_read_pointer(winstate->buffer, 0);
 	}
+}
+
+/*
+ * begin_partition
+ * Start buffering rows of the next partition.
+ */
+static void
+begin_partition(WindowAggState *winstate)
+{
+	PlanState  *outerPlan = outerPlanState(winstate);
+	int			numfuncs = winstate->numfuncs;
+
+	winstate->partition_spooled = false;
+	winstate->framehead_valid = false;
+	winstate->frametail_valid = false;
+	winstate->grouptail_valid = false;
+	winstate->spooled_rows = 0;
+	winstate->currentpos = 0;
+	winstate->frameheadpos = 0;
+	winstate->frametailpos = 0;
+	winstate->currentgroup = 0;
+	winstate->frameheadgroup = 0;
+	winstate->frametailgroup = 0;
+	winstate->groupheadpos = 0;
+	winstate->grouptailpos = -1;	/* see update_grouptailpos */
+	ExecClearTuple(winstate->agg_row_slot);
+	if (winstate->framehead_slot)
+		ExecClearTuple(winstate->framehead_slot);
+	if (winstate->frametail_slot)
+		ExecClearTuple(winstate->frametail_slot);
+
+	/*
+	 * If this is the very first partition, we need to fetch the first input
+	 * row to store in first_part_slot.
+	 */
+	if (TupIsNull(winstate->first_part_slot))
+	{
+		TupleTableSlot *outerslot = ExecProcNode(outerPlan);
+
+		if (!TupIsNull(outerslot))
+			ExecCopySlot(winstate->first_part_slot, outerslot);
+		else
+		{
+			/* outer plan is empty, so we have nothing to do */
+			winstate->partition_spooled = true;
+			winstate->more_partitions = false;
+			return;
+		}
+	}
+
+	/* Create new tuplestore if not done already. */
+	if (unlikely(winstate->buffer == NULL))
+		prepare_tuplestore(winstate);
+
+	winstate->next_partition = false;
+
+	if (winstate->numaggs > 0)
+	{
+		WindowObject agg_winobj = winstate->agg_winobj;
+
+		/* reset mark and see positions for aggregate functions */
+		agg_winobj->markpos = -1;
+		agg_winobj->seekpos = -1;
+
+		/* Also reset the row counters for aggregates */
+		winstate->aggregatedbase = 0;
+		winstate->aggregatedupto = 0;
+	}
+
+	/* reset mark and seek positions for each real window function */
+	for (int i = 0; i < numfuncs; i++)
+	{
+		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
+
+		if (!perfuncstate->plain_agg)
+		{
+			WindowObject winobj = perfuncstate->winobj;
+
+			winobj->markpos = -1;
+			winobj->seekpos = -1;
+		}
+	}
 
 	/*
 	 * Store the first tuple into the tuplestore (it's always available now;
@@ -1360,9 +1401,9 @@ release_partition(WindowAggState *winstate)
 	}
 
 	if (winstate->buffer)
-		tuplestore_end(winstate->buffer);
-	winstate->buffer = NULL;
+		tuplestore_clear(winstate->buffer);
 	winstate->partition_spooled = false;
+	winstate->next_partition = true;
 }
 
 /*
@@ -2143,7 +2184,7 @@ ExecWindowAgg(PlanState *pstate)
 	/* We need to loop as the runCondition or qual may filter out tuples */
 	for (;;)
 	{
-		if (winstate->buffer == NULL)
+		if (winstate->next_partition)
 		{
 			/* Initialize for first partition and set current row = 0 */
 			begin_partition(winstate);
@@ -2686,6 +2727,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
 	winstate->all_first = true;
 	winstate->partition_spooled = false;
 	winstate->more_partitions = false;
+	winstate->next_partition = true;
 
 	return winstate;
 }
@@ -2700,6 +2742,14 @@ ExecEndWindowAgg(WindowAggState *node)
 	PlanState  *outerPlan;
 	int			i;
 
+	if (node->buffer != NULL)
+	{
+		tuplestore_end(node->buffer);
+
+		/* nullify so that release_partition skips the tuplestore_clear() */
+		node->buffer = NULL;
+	}
+
 	release_partition(node);
 
 	for (i = 0; i < node->numaggs; i++)
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index af7d8fd1e7..add76b7eb5 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2644,6 +2644,7 @@ typedef struct WindowAggState
 	bool		all_first;		/* true if the scan is starting */
 	bool		partition_spooled;	/* true if all tuples in current partition
 									 * have been spooled into tuplestore */
+	bool		next_partition; /* true if begin_partition needs to be called */
 	bool		more_partitions;	/* true if there's more partitions after
 									 * this one */
 	bool		framehead_valid;	/* true if frameheadpos is known up to
-- 
2.34.1

