On Fri, Jan 20, 2023 at 11:02 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> Yeah.  I think the analysis looks good, but I'll do some testing next
> week with the aim of getting it committed.  Looks like it now needs
> Meson changes, but I'll look after that as my penance.

Here's an updated version that I'm testing...

Changes to the main patch:

* Adjust a few comments
* pgindent
* Explained a bit more in the commit message

I'm wondering about this bit in rebin_segment():

+       if (segment_map->header == NULL)
+               return;

Why would we be rebinning an uninitialised/unused segment?  Does
something in your DSA-client code (I guess you have an extension?) hit
this case?  The tests certainly don't; I'm not sure how the case could
be reached.

Changes to the test:

* Update copyright year
* Size -> size_t
* pgindent
* Add Meson glue
* Re-alphabetise the makefile
* Make sure we get BGWH_STOPPED while waiting for bgworkers to exit
* Background worker main function return type is fixed (void)
* results[1] -> results[FLEXIBLE_ARRAY_MEMBER]
* getpid() -> MyProcPid

I wonder if this code would be easier to understand, but not
materially less efficient, if we re-binned eagerly when allocating
too, so the bin is always correct/optimal.  Checking fpm_largest()
again after allocating should be cheap, I guess (it just reads a
member variable that we already paid the cost of maintaining).  We
don't really seem to amortise much, we just transfer the rebinning
work to the next caller to consider the segment.  I haven't tried out
that theory though.
From 5b6a3626dd0f8bacd7e26e490a20340e7ae6769e Mon Sep 17 00:00:00 2001
From: Liu Dongming <ldming101@gmail.com>
Date: Fri, 18 Mar 2022 11:49:06 +0800
Subject: [PATCH v3 1/2] Re-bin segment when memory pages are freed.

It's OK to be lazy about re-binning memory segments when allocating,
because that can only leave segments in a bin that's too high.  We'll
search higher bins if necessary while allocating next time, and
also eventually re-bin, so no memory can become unreachable that way.

However, when freeing memory, the largest contiguous range of free pages
might go up, so we should re-bin eagerly to make sure we don't leave the
segment in a bin that is too low for get_best_segment() to find.

The re-binning code is moved into a function of its own, so it can be
called whenever free pages are returned to the segment's free page map.

Author: Dongming Liu <ldming101@gmail.com>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/CAL1p7e8LzB2LSeAXo2pXCW4%2BRya9s0sJ3G_ReKOU%3DAjSUWjHWQ%40mail.gmail.com
---
 src/backend/utils/mmgr/dsa.c | 67 ++++++++++++++++++++++++------------
 1 file changed, 45 insertions(+), 22 deletions(-)

diff --git a/src/backend/utils/mmgr/dsa.c b/src/backend/utils/mmgr/dsa.c
index f5a62061a3..0ef99dcba5 100644
--- a/src/backend/utils/mmgr/dsa.c
+++ b/src/backend/utils/mmgr/dsa.c
@@ -418,6 +418,7 @@ static dsa_area *attach_internal(void *place, dsm_segment *segment,
 								 dsa_handle handle);
 static void check_for_freed_segments(dsa_area *area);
 static void check_for_freed_segments_locked(dsa_area *area);
+static void rebin_segment(dsa_area *area, dsa_segment_map *segment_map);
 
 /*
  * Create a new shared area in a new DSM segment.  Further DSM segments will
@@ -869,7 +870,11 @@ dsa_free(dsa_area *area, dsa_pointer dp)
 		FreePageManagerPut(segment_map->fpm,
 						   DSA_EXTRACT_OFFSET(span->start) / FPM_PAGE_SIZE,
 						   span->npages);
+
+		/* Move segment to appropriate bin if necessary. */
+		rebin_segment(area, segment_map);
 		LWLockRelease(DSA_AREA_LOCK(area));
+
 		/* Unlink span. */
 		LWLockAcquire(DSA_SCLASS_LOCK(area, DSA_SCLASS_SPAN_LARGE),
 					  LW_EXCLUSIVE);
@@ -1858,6 +1863,10 @@ destroy_superblock(dsa_area *area, dsa_pointer span_pointer)
 			segment_map->mapped_address = NULL;
 		}
 	}
+
+	/* Move segment to appropriate bin if necessary. */
+	rebin_segment(area, segment_map);
+
 	LWLockRelease(DSA_AREA_LOCK(area));
 
 	/*
@@ -2021,28 +2030,7 @@ get_best_segment(dsa_area *area, size_t npages)
 			/* Re-bin it if it's no longer in the appropriate bin. */
 			if (contiguous_pages < threshold)
 			{
-				size_t		new_bin;
-
-				new_bin = contiguous_pages_to_segment_bin(contiguous_pages);
-
-				/* Remove it from its current bin. */
-				unlink_segment(area, segment_map);
-
-				/* Push it onto the front of its new bin. */
-				segment_map->header->prev = DSA_SEGMENT_INDEX_NONE;
-				segment_map->header->next =
-					area->control->segment_bins[new_bin];
-				segment_map->header->bin = new_bin;
-				area->control->segment_bins[new_bin] = segment_index;
-				if (segment_map->header->next != DSA_SEGMENT_INDEX_NONE)
-				{
-					dsa_segment_map *next;
-
-					next = get_segment_by_index(area,
-												segment_map->header->next);
-					Assert(next->header->bin == new_bin);
-					next->header->prev = segment_index;
-				}
+				rebin_segment(area, segment_map);
 
 				/*
 				 * But fall through to see if it's enough to satisfy this
@@ -2297,3 +2285,38 @@ check_for_freed_segments_locked(dsa_area *area)
 		area->freed_segment_counter = freed_segment_counter;
 	}
 }
+
+/*
+ * Re-bin segment if it's no longer in the appropriate bin.
+ */
+static void
+rebin_segment(dsa_area *area, dsa_segment_map *segment_map)
+{
+	size_t		new_bin;
+	dsa_segment_index segment_index;
+
+	if (segment_map->header == NULL)
+		return;
+
+	new_bin = contiguous_pages_to_segment_bin(fpm_largest(segment_map->fpm));
+	if (segment_map->header->bin == new_bin)
+		return;
+
+	/* Remove it from its current bin. */
+	unlink_segment(area, segment_map);
+
+	/* Push it onto the front of its new bin. */
+	segment_index = get_segment_index(area, segment_map);
+	segment_map->header->prev = DSA_SEGMENT_INDEX_NONE;
+	segment_map->header->next = area->control->segment_bins[new_bin];
+	segment_map->header->bin = new_bin;
+	area->control->segment_bins[new_bin] = segment_index;
+	if (segment_map->header->next != DSA_SEGMENT_INDEX_NONE)
+	{
+		dsa_segment_map *next;
+
+		next = get_segment_by_index(area, segment_map->header->next);
+		Assert(next->header->bin == new_bin);
+		next->header->prev = segment_index;
+	}
+}
-- 
2.39.1

From 664968277c9a071a83f79edb1c768de0649889ed Mon Sep 17 00:00:00 2001
From: Liu Dongming <ldming101@gmail.com>
Date: Fri, 18 Mar 2022 13:26:11 +0800
Subject: [PATCH v3 2/2] Add a test module to exercise dsa.c.

Code originally developed along with dsa.c back in 2016, now extended
with a capped allocate-then-free-twice test by Dongming Liu, as part of
a bug investigation and fix.

Author: Dongming Liu <ldming101@gmail.com>
Author: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/CAL1p7e8LzB2LSeAXo2pXCW4%2BRya9s0sJ3G_ReKOU%3DAjSUWjHWQ%40mail.gmail.com
Discussion: https://postgr.es/m/CAEepm%3D3U7%2BRo7%3DECeQuAZoeFXs8iDVX56NXGCV7z3%3D%2BH%2BWd0Sw%40mail.gmail.com
---
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 src/test/modules/test_dsa/.gitignore          |   4 +
 src/test/modules/test_dsa/Makefile            |  19 +
 .../modules/test_dsa/expected/test_dsa.out    |  55 +++
 src/test/modules/test_dsa/meson.build         |  36 ++
 src/test/modules/test_dsa/sql/test_dsa.sql    |  13 +
 src/test/modules/test_dsa/test_dsa--1.0.sql   |  21 +
 src/test/modules/test_dsa/test_dsa.c          | 407 ++++++++++++++++++
 src/test/modules/test_dsa/test_dsa.control    |   5 +
 src/tools/pgindent/typedefs.list              |   4 +
 11 files changed, 566 insertions(+)
 create mode 100644 src/test/modules/test_dsa/.gitignore
 create mode 100644 src/test/modules/test_dsa/Makefile
 create mode 100644 src/test/modules/test_dsa/expected/test_dsa.out
 create mode 100644 src/test/modules/test_dsa/meson.build
 create mode 100644 src/test/modules/test_dsa/sql/test_dsa.sql
 create mode 100644 src/test/modules/test_dsa/test_dsa--1.0.sql
 create mode 100644 src/test/modules/test_dsa/test_dsa.c
 create mode 100644 src/test/modules/test_dsa/test_dsa.control

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index c629cbe383..9926c18705 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -18,6 +18,7 @@ SUBDIRS = \
 		  test_copy_callbacks \
 		  test_custom_rmgrs \
 		  test_ddl_deparse \
+		  test_dsa \
 		  test_extensions \
 		  test_ginpostinglist \
 		  test_integerset \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 1baa6b558d..17151bfdaa 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -14,6 +14,7 @@ subdir('test_bloomfilter')
 subdir('test_copy_callbacks')
 subdir('test_custom_rmgrs')
 subdir('test_ddl_deparse')
+subdir('test_dsa')
 subdir('test_extensions')
 subdir('test_ginpostinglist')
 subdir('test_integerset')
diff --git a/src/test/modules/test_dsa/.gitignore b/src/test/modules/test_dsa/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_dsa/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_dsa/Makefile b/src/test/modules/test_dsa/Makefile
new file mode 100644
index 0000000000..f45642a32a
--- /dev/null
+++ b/src/test/modules/test_dsa/Makefile
@@ -0,0 +1,19 @@
+# src/test/modules/test_dsa/Makefile
+
+MODULES = test_dsa
+
+EXTENSION = test_dsa
+DATA = test_dsa--1.0.sql
+PGFILEDESC = "test_dsa -- tests for DSA areas"
+REGRESS = test_dsa
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_dsa
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_dsa/expected/test_dsa.out b/src/test/modules/test_dsa/expected/test_dsa.out
new file mode 100644
index 0000000000..4d594130ca
--- /dev/null
+++ b/src/test/modules/test_dsa/expected/test_dsa.out
@@ -0,0 +1,55 @@
+CREATE EXTENSION test_dsa;
+SELECT test_dsa_random(3, 5, 16, 10240, 'random');
+ test_dsa_random 
+-----------------
+ 
+(1 row)
+
+SELECT test_dsa_random(3, 5, 16, 10240, 'forwards');
+ test_dsa_random 
+-----------------
+ 
+(1 row)
+
+SELECT test_dsa_random(3, 5, 16, 10240, 'backwards');
+ test_dsa_random 
+-----------------
+ 
+(1 row)
+
+SELECT count(*) from test_dsa_random_parallel(3, 5, 16, 10240, 'random', 5);
+ count 
+-------
+     5
+(1 row)
+
+SELECT count(*) from test_dsa_random_parallel(3, 5, 16, 10240, 'forwards', 5);
+ count 
+-------
+     5
+(1 row)
+
+SELECT count(*) from test_dsa_random_parallel(3, 5, 16, 10240, 'backwards', 5);
+ count 
+-------
+     5
+(1 row)
+
+SELECT test_dsa_oom(1024);
+ test_dsa_oom 
+--------------
+ 
+(1 row)
+
+SELECT test_dsa_oom(8192);
+ test_dsa_oom 
+--------------
+ 
+(1 row)
+
+SELECT test_dsa_oom(10240);
+ test_dsa_oom 
+--------------
+ 
+(1 row)
+
diff --git a/src/test/modules/test_dsa/meson.build b/src/test/modules/test_dsa/meson.build
new file mode 100644
index 0000000000..4578756ecf
--- /dev/null
+++ b/src/test/modules/test_dsa/meson.build
@@ -0,0 +1,36 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# FIXME: prevent install during main install, but not during test :/
+
+test_dsa_sources = files(
+  'test_dsa.c',
+)
+
+if host_system == 'windows'
+  test_dsa_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_dsa',
+    '--FILEDESC', 'test_dsa - test code for dsa.c',])
+endif
+
+test_dsa = shared_module('test_dsa',
+  test_dsa_sources,
+  kwargs: pg_mod_args,
+)
+testprep_targets += test_dsa
+
+install_data(
+  'test_dsa.control',
+  'test_dsa--1.0.sql',
+  kwargs: contrib_data_args,
+)
+
+tests += {
+  'name': 'test_dsa',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_dsa',
+    ],
+  },
+}
diff --git a/src/test/modules/test_dsa/sql/test_dsa.sql b/src/test/modules/test_dsa/sql/test_dsa.sql
new file mode 100644
index 0000000000..3ce5abbeb5
--- /dev/null
+++ b/src/test/modules/test_dsa/sql/test_dsa.sql
@@ -0,0 +1,13 @@
+CREATE EXTENSION test_dsa;
+
+SELECT test_dsa_random(3, 5, 16, 10240, 'random');
+SELECT test_dsa_random(3, 5, 16, 10240, 'forwards');
+SELECT test_dsa_random(3, 5, 16, 10240, 'backwards');
+
+SELECT count(*) from test_dsa_random_parallel(3, 5, 16, 10240, 'random', 5);
+SELECT count(*) from test_dsa_random_parallel(3, 5, 16, 10240, 'forwards', 5);
+SELECT count(*) from test_dsa_random_parallel(3, 5, 16, 10240, 'backwards', 5);
+
+SELECT test_dsa_oom(1024);
+SELECT test_dsa_oom(8192);
+SELECT test_dsa_oom(10240);
\ No newline at end of file
diff --git a/src/test/modules/test_dsa/test_dsa--1.0.sql b/src/test/modules/test_dsa/test_dsa--1.0.sql
new file mode 100644
index 0000000000..ab575ff66e
--- /dev/null
+++ b/src/test/modules/test_dsa/test_dsa--1.0.sql
@@ -0,0 +1,21 @@
+/* src/test/modules/test_dsa/test_dsa--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_dsa" to load this file. \quit
+
+CREATE FUNCTION test_dsa_random(loops int, num_allocs int, min_alloc int, max_alloc int, mode text)
+RETURNS VOID
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE TYPE test_dsa_row AS (pid int, allocations bigint, elapsed interval);
+
+CREATE FUNCTION test_dsa_random_parallel(loops int, num_allocs int, min_alloc int, max_alloc int, mode text, workers int)
+RETURNS SETOF test_dsa_row
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE FUNCTION test_dsa_oom(alloc_size int)
+RETURNS VOID
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
\ No newline at end of file
diff --git a/src/test/modules/test_dsa/test_dsa.c b/src/test/modules/test_dsa/test_dsa.c
new file mode 100644
index 0000000000..071243a4db
--- /dev/null
+++ b/src/test/modules/test_dsa/test_dsa.c
@@ -0,0 +1,407 @@
+/* -------------------------------------------------------------------------
+ *
+ * test_dsa.c
+ *		Simple exercises for dsa.c.
+ *
+ * Copyright (C) 2016-2023, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_dsa/test_dsa.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "utils/builtins.h"
+#include "utils/dsa.h"
+#include "utils/resowner.h"
+#include "utils/timestamp.h"
+
+#include <stdlib.h>
+#include <unistd.h>
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_dsa_random);
+PG_FUNCTION_INFO_V1(test_dsa_random_parallel);
+PG_FUNCTION_INFO_V1(test_dsa_oom);
+
+PGDLLEXPORT void test_dsa_random_worker_main(Datum arg);
+
+/* Which order to free objects in, within each loop. */
+typedef enum
+{
+	/* Free in random order. */
+	MODE_RANDOM,
+	/* Free in the same order we allocated (FIFO). */
+	MODE_FORWARDS,
+	/* Free in reverse order of allocation (LIFO). */
+	MODE_BACKWARDS
+} test_mode;
+
+/* Per-worker results. */
+typedef struct
+{
+	pid_t		pid;
+	int64		count;
+	TimeOffset	elapsed_time;
+} test_result;
+
+/* Parameters for a test run, passed to workers. */
+typedef struct
+{
+	int			loops;
+	int			num_allocs;
+	int			min_alloc;
+	int			max_alloc;
+	test_mode	mode;
+	test_result results[FLEXIBLE_ARRAY_MEMBER];
+} test_parameters;
+
+/* The startup message given to each worker. */
+typedef struct
+{
+	/* How to connect to the shmem area. */
+	dsa_handle	area_handle;
+	/* Where to find the parameters. */
+	dsa_pointer parameters;
+	/* What index this worker should write results to. */
+	size_t		output_index;
+} test_hello;
+
+static test_mode
+parse_test_mode(text *mode)
+{
+	test_mode	result = MODE_RANDOM;
+	char	   *cstr = text_to_cstring(mode);
+
+	if (strcmp(cstr, "random") == 0)
+		result = MODE_RANDOM;
+	else if (strcmp(cstr, "forwards") == 0)
+		result = MODE_FORWARDS;
+	else if (strcmp(cstr, "backwards") == 0)
+		result = MODE_BACKWARDS;
+	else
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("unknown mode")));
+	return result;
+}
+
+static void
+check_parameters(const test_parameters *parameters)
+{
+	if (parameters->min_alloc < 1 || parameters->min_alloc > parameters->max_alloc)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("min_alloc must be >= 1, and min_alloc must be <= max_alloc")));
+}
+
+static int
+my_tranche_id(void)
+{
+	static int	tranche_id = 0;
+
+	if (tranche_id == 0)
+		tranche_id = LWLockNewTrancheId();
+
+	return tranche_id;
+}
+
+static void
+do_random_test(dsa_area *area, size_t output_index, test_parameters *parameters)
+{
+	dsa_pointer *objects;
+	int			min_alloc;
+	int			extra_alloc;
+	int32		i;
+	int32		loop;
+	TimestampTz start_time = GetCurrentTimestamp();
+	int64		total_allocations = 0;
+
+	/*
+	 * Make tests reproducible (on the same computer at least) by using the
+	 * same random sequence every time.
+	 */
+	srand(42);
+
+	min_alloc = parameters->min_alloc;
+	extra_alloc = parameters->max_alloc - parameters->min_alloc;
+
+	objects = palloc(sizeof(dsa_pointer) * parameters->num_allocs);
+	Assert(objects != NULL);
+	for (loop = 0; loop < parameters->loops; ++loop)
+	{
+		int			num_actually_allocated = 0;
+
+		for (i = 0; i < parameters->num_allocs; ++i)
+		{
+			size_t		size;
+			void	   *memory;
+
+			/* Adjust size randomly if needed. */
+			size = min_alloc;
+			if (extra_alloc > 0)
+				size += rand() % extra_alloc;
+
+			/* Allocate! */
+			objects[i] = dsa_allocate_extended(area, size, DSA_ALLOC_NO_OOM);
+			if (!DsaPointerIsValid(objects[i]))
+			{
+				elog(LOG, "dsa: loop %d: out of memory after allocating %d objects", loop, i + 1);
+				break;
+			}
+			++num_actually_allocated;
+			/* Pay the cost of accessing that memory */
+			memory = dsa_get_address(area, objects[i]);
+			memset(memory, 42, size);
+		}
+		if (parameters->mode == MODE_RANDOM)
+		{
+			for (i = 0; i < num_actually_allocated; ++i)
+			{
+				size_t		x = rand() % num_actually_allocated;
+				size_t		y = rand() % num_actually_allocated;
+				dsa_pointer temp = objects[x];
+
+				objects[x] = objects[y];
+				objects[y] = temp;
+			}
+		}
+		if (parameters->mode == MODE_BACKWARDS)
+		{
+			for (i = num_actually_allocated - 1; i >= 0; --i)
+				dsa_free(area, objects[i]);
+		}
+		else
+		{
+			for (i = 0; i < num_actually_allocated; ++i)
+				dsa_free(area, objects[i]);
+		}
+		total_allocations += num_actually_allocated;
+	}
+	pfree(objects);
+
+	parameters->results[output_index].elapsed_time = GetCurrentTimestamp() - start_time;
+	parameters->results[output_index].pid = MyProcPid;
+	parameters->results[output_index].count = total_allocations;
+}
+
+/* Non-parallel version: just do it. */
+Datum
+test_dsa_random(PG_FUNCTION_ARGS)
+{
+	test_parameters *parameters;
+	dsa_area   *area;
+
+	parameters =
+		palloc(offsetof(test_parameters, results) + sizeof(test_result));
+	parameters->loops = PG_GETARG_INT32(0);
+	parameters->num_allocs = PG_GETARG_INT32(1);
+	parameters->min_alloc = PG_GETARG_INT32(2);
+	parameters->max_alloc = PG_GETARG_INT32(3);
+	parameters->mode = parse_test_mode(PG_GETARG_TEXT_PP(4));
+	check_parameters(parameters);
+
+	area = dsa_create(my_tranche_id());
+	do_random_test(area, 0, parameters);
+	dsa_dump(area);
+	dsa_detach(area);
+
+	pfree(parameters);
+
+	PG_RETURN_NULL();
+}
+
+void
+test_dsa_random_worker_main(Datum arg)
+{
+	test_hello	hello;
+	dsa_area   *area;
+	test_parameters *parameters;
+
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_dsa toplevel");
+
+	/* Receive hello message and attach to shmem area. */
+	memcpy(&hello, MyBgworkerEntry->bgw_extra, sizeof(hello));
+	area = dsa_attach(hello.area_handle);
+	Assert(area != NULL);
+	parameters = dsa_get_address(area, hello.parameters);
+	Assert(parameters != NULL);
+
+	do_random_test(area, hello.output_index, parameters);
+
+	dsa_detach(area);
+}
+
+/* Parallel version: fork a bunch of background workers to do it. */
+Datum
+test_dsa_random_parallel(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+
+	test_hello	hello;
+	test_parameters *parameters;
+	dsa_area   *area;
+	int			workers;
+	int			i;
+	BackgroundWorkerHandle **handles;
+
+	/* tuplestore boilerplate stuff... */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Prepare to work! */
+	workers = PG_GETARG_INT32(5);
+	handles = palloc(sizeof(BackgroundWorkerHandle *) * workers);
+
+	/* Set up the shared memory area. */
+	area = dsa_create(my_tranche_id());
+
+	/* The workers then will attach to it. */
+	hello.area_handle = dsa_get_handle(area);
+
+	/* Allocate space for the parameters object. */
+	hello.parameters = dsa_allocate(area,
+									offsetof(test_parameters, results) +
+									sizeof(test_result) * workers);
+	Assert(DsaPointerIsValid(hello.parameters));
+
+	/* Set up the parameters object. */
+	parameters = dsa_get_address(area, hello.parameters);
+	parameters->loops = PG_GETARG_INT32(0);
+	parameters->num_allocs = PG_GETARG_INT32(1);
+	parameters->min_alloc = PG_GETARG_INT32(2);
+	parameters->max_alloc = PG_GETARG_INT32(3);
+	parameters->mode = parse_test_mode(PG_GETARG_TEXT_PP(4));
+	check_parameters(parameters);
+
+	/* Start the workers. */
+	for (i = 0; i < workers; ++i)
+	{
+		BackgroundWorker bgw;
+
+		memset(&bgw, 0, sizeof(bgw));
+		snprintf(bgw.bgw_name, sizeof(bgw.bgw_name), "worker%d", i);
+		bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
+		bgw.bgw_start_time = BgWorkerStart_PostmasterStart;
+		bgw.bgw_restart_time = BGW_NEVER_RESTART;
+		snprintf(bgw.bgw_library_name, sizeof(bgw.bgw_library_name),
+				 "test_dsa");
+		snprintf(bgw.bgw_function_name, sizeof(bgw.bgw_function_name),
+				 "test_dsa_random_worker_main");
+		Assert(sizeof(parameters) <= BGW_EXTRALEN);
+		/* Each worker will write its output to a different slot. */
+		hello.output_index = i;
+		memcpy(bgw.bgw_extra, &hello, sizeof(hello));
+		bgw.bgw_notify_pid = MyProcPid;
+
+		if (!RegisterDynamicBackgroundWorker(&bgw, &handles[i]))
+			elog(ERROR, "can't start worker");
+	}
+
+	/* Wait for the workers to complete. */
+	for (i = 0; i < workers; ++i)
+	{
+		BgwHandleStatus status;
+
+		status = WaitForBackgroundWorkerShutdown(handles[i]);
+		if (status == BGWH_POSTMASTER_DIED)
+			proc_exit(1);
+		Assert(status == BGWH_STOPPED);
+	}
+
+	/* Generate result tuples. */
+	for (i = 0; i < workers; ++i)
+	{
+		Datum		values[3];
+		bool		nulls[] = {false, false, false};
+		Interval   *interval = palloc(sizeof(Interval));
+
+		interval->month = 0;
+		interval->day = 0;
+		interval->time = parameters->results[i].elapsed_time;
+
+		values[0] = Int32GetDatum(parameters->results[i].pid);
+		values[1] = Int64GetDatum(parameters->results[i].count);
+		values[2] = PointerGetDatum(interval);
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+	tuplestore_donestoring(tupstore);
+
+	pfree(handles);
+	dsa_detach(area);
+
+	return (Datum) 0;
+}
+
+/* Allocate memory until OOM, than free and try allocate again. */
+Datum
+test_dsa_oom(PG_FUNCTION_ARGS)
+{
+	test_parameters *parameters;
+	dsa_area   *area;
+	int64		cnt1,
+				cnt2;
+
+	parameters =
+		palloc(offsetof(test_parameters, results) + sizeof(test_result));
+
+	parameters->loops = 1;
+	parameters->min_alloc = PG_GETARG_INT32(0);
+	parameters->max_alloc = parameters->min_alloc;
+	check_parameters(parameters);
+
+	parameters->num_allocs = 1024 * 1024 / parameters->min_alloc;
+	parameters->mode = MODE_RANDOM;
+
+	/* Cap available memory at 1MB. */
+	area = dsa_create(my_tranche_id());
+	dsa_set_size_limit(area, 1024 * 1024);
+	dsa_dump(area);
+
+	do_random_test(area, 0, parameters);
+	dsa_dump(area);
+	cnt1 = parameters->results[0].count;
+
+	/* And again... */
+	do_random_test(area, 0, parameters);
+	dsa_dump(area);
+	cnt2 = parameters->results[0].count;
+
+	dsa_detach(area);
+	pfree(parameters);
+
+	/* We should have allocated the same amount both times. */
+	Assert(cnt1 == cnt2);
+
+	PG_RETURN_NULL();
+}
diff --git a/src/test/modules/test_dsa/test_dsa.control b/src/test/modules/test_dsa/test_dsa.control
new file mode 100644
index 0000000000..2655c3fccd
--- /dev/null
+++ b/src/test/modules/test_dsa/test_dsa.control
@@ -0,0 +1,5 @@
+# dsa_test extension
+comment = 'Tests for DSA'
+default_version = '1.0'
+module_pathname = '$libdir/test_dsa'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 22ea42c16b..c3d567e0e2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3727,8 +3727,12 @@ symbol
 tablespaceinfo
 teSection
 temp_tablespaces_extra
+test_hello
+test_mode
+test_parameters
 test_re_flags
 test_regex_ctx
+test_result
 test_shm_mq_header
 test_spec
 test_start_function
-- 
2.39.1

Reply via email to