From 9e5db70291ab85ce04a77401683a549964dfe9dc Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Tue, 18 Sep 2018 09:00:23 +1200
Subject: [PATCH] Fix segment_bins corruption in dsa.c.

If a segment has been freed by dsa.c because it is entirely empty, other
backends must make sure to unmap it before following links to new
segments that might happen to have the same index number, or they could
inadvertantly finish up looking at a defunct segment and then corrupt
the segment_bins lists.  The correct protocol requires checking
freed_segment_counter after acquiring the area lock and before resolving
any index number to a segment.  Add the missing checks and an assertion.

Back-patch to 10, where dsa.c first arrived.

Author: Thomas Munro
Reported-by: Tomas Vondra
Discussion: https://postgr.es/m/CAEepm%3D0thg%2Bja5zGVa7jBy-uqyHrTqTm8HGhEOtMmigGrAqTbw%40mail.gmail.com
---
 src/backend/utils/mmgr/dsa.c | 49 ++++++++++++++++++++++++++++++++----
 1 file changed, 44 insertions(+), 5 deletions(-)

diff --git a/src/backend/utils/mmgr/dsa.c b/src/backend/utils/mmgr/dsa.c
index 488678191b..ec7c475ca2 100644
--- a/src/backend/utils/mmgr/dsa.c
+++ b/src/backend/utils/mmgr/dsa.c
@@ -405,6 +405,7 @@ static dsa_area *create_internal(void *place, size_t size,
 static dsa_area *attach_internal(void *place, dsm_segment *segment,
 				dsa_handle handle);
 static void check_for_freed_segments(dsa_area *area);
+static void check_for_freed_segments_locked(dsa_area *area);
 
 /*
  * Create a new shared area in a new DSM segment.  Further DSM segments will
@@ -1065,6 +1066,7 @@ dsa_dump(dsa_area *area)
 	 */
 
 	LWLockAcquire(DSA_AREA_LOCK(area), LW_EXCLUSIVE);
+	check_for_freed_segments_locked(area);
 	fprintf(stderr, "dsa_area handle %x:\n", area->control->handle);
 	fprintf(stderr, "  max_total_segment_size: %zu\n",
 			area->control->max_total_segment_size);
@@ -1762,6 +1764,22 @@ get_segment_by_index(dsa_area *area, dsa_segment_index index)
 			   (DSA_SEGMENT_HEADER_MAGIC ^ area->control->handle ^ index));
 	}
 
+	/*
+	 * Callers of dsa_get_address() or dsa_free() don't hold the area lock,
+	 * but it's a bug in the calling code and undefined behavior if
+	 * the address is not live (ie if the segment might possibly have been
+	 * freed, they're trying to use a dangling pointer).
+	 *
+	 * For code that manipulates segment_bins lists, it would be a bug in
+	 * dsa.c if we ever reach a freed segment that way.  After it's marked
+	 * as freed, the only thing any backend should do with it is unmap it,
+	 * and it should always do that via check_for_freed_segments_locked()
+	 * before arriving here to resolve an index to a segment_map.
+	 *
+	 * Either way we can assert that we didn't arrive at a freed segment.
+	 */
+	Assert(!area->segment_maps[index].header->freed);
+
 	return &area->segment_maps[index];
 }
 
@@ -1778,8 +1796,6 @@ destroy_superblock(dsa_area *area, dsa_pointer span_pointer)
 	int			size_class = span->size_class;
 	dsa_segment_map *segment_map;
 
-	segment_map =
-		get_segment_by_index(area, DSA_EXTRACT_SEGMENT_NUMBER(span->start));
 
 	/* Remove it from its fullness class list. */
 	unlink_span(area, span);
@@ -1790,6 +1806,9 @@ destroy_superblock(dsa_area *area, dsa_pointer span_pointer)
 	 * could deadlock.
 	 */
 	LWLockAcquire(DSA_AREA_LOCK(area), LW_EXCLUSIVE);
+	check_for_freed_segments_locked(area);
+	segment_map =
+		get_segment_by_index(area, DSA_EXTRACT_SEGMENT_NUMBER(span->start));
 	FreePageManagerPut(segment_map->fpm,
 					   DSA_EXTRACT_OFFSET(span->start) / FPM_PAGE_SIZE,
 					   span->npages);
@@ -1944,6 +1963,7 @@ get_best_segment(dsa_area *area, Size npages)
 	Size		bin;
 
 	Assert(LWLockHeldByMe(DSA_AREA_LOCK(area)));
+	check_for_freed_segments_locked(area);
 
 	/*
 	 * Start searching from the first bin that *might* have enough contiguous
@@ -2220,10 +2240,30 @@ check_for_freed_segments(dsa_area *area)
 	freed_segment_counter = area->control->freed_segment_counter;
 	if (unlikely(area->freed_segment_counter != freed_segment_counter))
 	{
-		int			i;
-
 		/* Check all currently mapped segments to find what's been freed. */
 		LWLockAcquire(DSA_AREA_LOCK(area), LW_EXCLUSIVE);
+		check_for_freed_segments_locked(area);
+		LWLockRelease(DSA_AREA_LOCK(area));
+	}
+}
+
+/*
+ * Workhorse for check_for_free_segments(), and also used directly in path
+ * where the area lock is already held.  This should be called after acquiring
+ * the lock but before looking up any segment by index number, to make sure we
+ * unmap any stale segments that might have previously had the same index as a
+ * current segment.
+ */
+static void
+check_for_freed_segments_locked(dsa_area *area)
+{
+	Size		freed_segment_counter;
+	int		i;
+
+	Assert(LWLockHeldByMe(DSA_AREA_LOCK(area)));
+	freed_segment_counter = area->control->freed_segment_counter;
+	if (unlikely(area->freed_segment_counter != freed_segment_counter))
+	{
 		for (i = 0; i <= area->high_segment_index; ++i)
 		{
 			if (area->segment_maps[i].header != NULL &&
@@ -2235,7 +2275,6 @@ check_for_freed_segments(dsa_area *area)
 				area->segment_maps[i].mapped_address = NULL;
 			}
 		}
-		LWLockRelease(DSA_AREA_LOCK(area));
 		area->freed_segment_counter = freed_segment_counter;
 	}
 }
-- 
2.17.0

