On Tue, Jul 26, 2022 at 11:19:06AM -0700, Andres Freund wrote:
> On 2022-07-25 12:04:19 -0700, Nathan Bossart wrote:
>> From the discussion thus far, it seems there is interest in optimizing
>> [sub]xip lookups, so I'd like to spend some time moving it forward.  I
>> think the biggest open question is which approach to take.  Both the SIMD
>> and hash table approaches seem viable, but I think I prefer the SIMD
>> approach at the moment (see the last paragraph of quoted text for the
>> reasons).  What do folks think?
> 
> Agreed on all points.

Great!  Here is a new patch.  A couple notes:

 * I briefly looked into seeing whether auto-vectorization was viable and
   concluded it was not for these loops.

 * I borrowed USE_SSE2 from one of John Naylor's patches [0].  I'm not sure
   whether this is committable, so I would welcome thoughts on the proper
   form.  Given the comment says that SSE2 is supported by all x86-64
   hardware, I'm not seeing why we need the SSE 4.2 checks.  Is it not
   enough to check for __x86_64__ and _M_AMD64?

 * I haven't looked into adding an ARM implementation yet.

[0] 
https://postgr.es/m/CAFBsxsHko7yc8A-2PpjQ%3D2StomXF%2BT2jgKF%3DWaMFZWi8CvV7hA%40mail.gmail.com

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 10a0369182be525dbe849d856b663aede10c4c16 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 28 Jul 2022 12:15:47 -0700
Subject: [PATCH v3 1/1] Use SSE2 intrinsics for XidInMVCCSnapshot().

This optimizes the linear searches through the [sub]xip arrays when
possible, which should improve performance significantly when the
arrays are large.
---
 src/backend/utils/time/snapmgr.c | 28 +++--------
 src/include/c.h                  | 13 ++++++
 src/include/utils/linearsearch.h | 80 ++++++++++++++++++++++++++++++++
 3 files changed, 100 insertions(+), 21 deletions(-)
 create mode 100644 src/include/utils/linearsearch.h

diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 5bc2a15160..834c8867d4 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -63,6 +63,7 @@
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
+#include "utils/linearsearch.h"
 #include "utils/memutils.h"
 #include "utils/old_snapshot.h"
 #include "utils/rel.h"
@@ -2284,8 +2285,6 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
 bool
 XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
-	uint32		i;
-
 	/*
 	 * Make a quick range check to eliminate most XIDs without looking at the
 	 * xip arrays.  Note that this is OK even if we convert a subxact XID to
@@ -2317,13 +2316,8 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 		if (!snapshot->suboverflowed)
 		{
 			/* we have full data, so search subxip */
-			int32		j;
-
-			for (j = 0; j < snapshot->subxcnt; j++)
-			{
-				if (TransactionIdEquals(xid, snapshot->subxip[j]))
-					return true;
-			}
+			if (pg_linearsearch_uint32(xid, snapshot->subxip, snapshot->subxcnt))
+				return true;
 
 			/* not there, fall through to search xip[] */
 		}
@@ -2344,16 +2338,11 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 				return false;
 		}
 
-		for (i = 0; i < snapshot->xcnt; i++)
-		{
-			if (TransactionIdEquals(xid, snapshot->xip[i]))
-				return true;
-		}
+		if (pg_linearsearch_uint32(xid, snapshot->xip, snapshot->xcnt))
+			return true;
 	}
 	else
 	{
-		int32		j;
-
 		/*
 		 * In recovery we store all xids in the subxact array because it is by
 		 * far the bigger array, and we mostly don't know which xids are
@@ -2383,11 +2372,8 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 		 * indeterminate xid. We don't know whether it's top level or subxact
 		 * but it doesn't matter. If it's present, the xid is visible.
 		 */
-		for (j = 0; j < snapshot->subxcnt; j++)
-		{
-			if (TransactionIdEquals(xid, snapshot->subxip[j]))
-				return true;
-		}
+		if (pg_linearsearch_uint32(xid, snapshot->subxip, snapshot->subxcnt))
+			return true;
 	}
 
 	return false;
diff --git a/src/include/c.h b/src/include/c.h
index d35405f191..8b7d844fc9 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -371,6 +371,19 @@ typedef void (*pg_funcptr_t) (void);
 #endif
 #endif
 
+/*
+ * Are SSE2 intrinsics available?
+ *
+ * Note: We piggy-back on the check for SSE 4.2 intrinstics but only need SSE2
+ * at runtime.  That's supported by all x84-64 hardware, so we don't need an
+ * indirect function call.
+ *
+ * XXX: Consider removing CRC from the names.
+ */
+#if (defined(__x86_64__) || defined(_M_AMD64)) && (defined(USE_SSE42_CRC32C) || defined(USE_SSE42_CRC32C_WITH_RUNTIME_CHECK))
+#define USE_SSE2
+#endif
+
 
 /* ----------------------------------------------------------------
  *				Section 2:	bool, true, false
diff --git a/src/include/utils/linearsearch.h b/src/include/utils/linearsearch.h
new file mode 100644
index 0000000000..c797fd18ca
--- /dev/null
+++ b/src/include/utils/linearsearch.h
@@ -0,0 +1,80 @@
+/*-------------------------------------------------------------------------
+ *
+ * linearsearch.h
+ *	  Optimized linear search routines.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/include/utils/linearsearch.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LINEARSEARCH_H
+#define LINEARSEARCH_H
+
+#include "c.h"
+
+#ifdef USE_SSE2
+#include <nmmintrin.h>
+
+#include "port/pg_bitutils.h"
+#endif
+
+/*
+ * pg_linearsearch_uint32
+ *
+ * Returns the address of the first element in 'base' that equals 'key', or
+ * NULL if no match is found.
+ */
+#ifdef USE_SSE2
+pg_attribute_no_sanitize_alignment()
+#endif
+static inline uint32 *
+pg_linearsearch_uint32(uint32 key, uint32 *base, uint32 nelem)
+{
+	uint32		i = 0;
+
+	/* If possible, use SSE2 intrinsics to speed up the search. */
+#ifdef USE_SSE2
+	__m128i		keys = _mm_set1_epi32(key);	/* load 4 copies of key */
+	uint32		its = nelem & ~0xF;			/* round down to multiple of 16 */
+
+	for (; i < its; i += 16)
+	{
+		/* load the next 16 values into __m128i variables */
+		__m128i vals1 = _mm_loadu_si128((__m128i *) &base[i]);
+		__m128i vals2 = _mm_loadu_si128((__m128i *) &base[i + 4]);
+		__m128i vals3 = _mm_loadu_si128((__m128i *) &base[i + 8]);
+		__m128i vals4 = _mm_loadu_si128((__m128i *) &base[i + 12]);
+
+		/* perform the comparisons */
+		__m128i result1 = _mm_cmpeq_epi32(keys, vals1);
+		__m128i result2 = _mm_cmpeq_epi32(keys, vals2);
+		__m128i result3 = _mm_cmpeq_epi32(keys, vals3);
+		__m128i result4 = _mm_cmpeq_epi32(keys, vals4);
+
+		/* shrink the results into an integer */
+		__m128i tmp1 = _mm_packs_epi32(result1, result2);
+		__m128i tmp2 = _mm_packs_epi32(result3, result4);
+		__m128i tmp3 = _mm_packs_epi16(tmp1, tmp2);
+		uint32 result = _mm_movemask_epi8(tmp3);
+
+		/* if found, return a pointer to the matching element */
+		if (result != 0)
+			return &base[i + pg_rightmost_one_pos32(result)];
+	}
+#endif
+
+	/* Process the remaining elements the slow way. */
+	for (; i < nelem; i++)
+	{
+		if (key == base[i])
+			return &base[i];
+	}
+
+	return NULL;
+}
+
+#endif							/* LINEARSEARCH_H */
-- 
2.25.1

Reply via email to