On 09.07.18 15:49, Heikki Linnakangas wrote:
> The way 
> forward is to test if we can get the same performance benefit from 
> switching to CMPXCHG16B, and keep the WAL format unchanged.

I have implemented this.  I didn't see any performance benefit using the
benchmark that Alexander Kuzmenkov outlined earlier in the thread.  (But
I also didn't see a great benefit for the originally proposed patch, so
maybe I'm not doing this right or this particular hardware doesn't
benefit from it as much.)

I'm attaching the patches and scripts here if someone else wants to do
more testing.

The first patch adds a zoo of 128-bit atomics support.  It's consistent
with (a.k.a. copy-and-pasted from) the existing 32- and 64-bit set, but
it's not the complete set, only as much as was necessary for this exercise.

The second patch then makes use of that in the WAL code under discussion.

pgbench invocations were:

pgbench -i -I t bench
pgbench -n -c $N -j $N -M prepared -f pgbench-wal-cas.sql -T 60 bench

for N from 1 to 32.

Note:  With gcc (at least versions 7 and 8) you need to use some
non-default -march setting to get 128-bit atomics to work.  (Otherwise
the configure test fails and the fallback implementation is used.)  I
have found the minimum to be -march=nocona.  But different -march
settings actually affect the benchmark performance, so be sure to test
the baseline with the same -march setting.  Recommended configure
invocation: ./configure ... CC='gcc -march=whatever'

clang appears to work out of the box.

Also, curiously my gcc installations provide 128-bit
__sync_val_compare_and_swap() but not 128-bit
__atomic_compare_exchange_n() in spite of what the documentation indicates.

So independent of whether this approach actually provides any benefit,
the 128-bit atomics support seems a bit wobbly.

(I'm also wondering why we are using __sync_val_compare_and_swap()
rather than __sync_bool_compare_and_swap(), since all we're doing with
the return value is emulate the bool version anyway.)

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 3359d5d5dd828bc0317ca28474ebe5dd931d44e8 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Wed, 4 Jul 2018 22:13:58 +0200
Subject: [PATCH 1/2] Add some int128 atomics support

---
 config/c-compiler.m4                   | 31 +++++++++
 configure                              | 65 ++++++++++++++++++
 configure.in                           |  2 +
 src/backend/port/atomics.c             | 51 +++++++++++++++
 src/include/pg_config.h.in             |  8 +++
 src/include/port/atomics.h             | 49 ++++++++++++++
 src/include/port/atomics/fallback.h    | 30 +++++++++
 src/include/port/atomics/generic-gcc.h | 46 +++++++++++++
 src/include/port/atomics/generic.h     | 91 ++++++++++++++++++++++++++
 9 files changed, 373 insertions(+)

diff --git a/config/c-compiler.m4 b/config/c-compiler.m4
index 9731a517de..a4aecd3e61 100644
--- a/config/c-compiler.m4
+++ b/config/c-compiler.m4
@@ -606,6 +606,21 @@ if test x"$pgac_cv_gcc_sync_int64_cas" = x"yes"; then
   AC_DEFINE(HAVE_GCC__SYNC_INT64_CAS, 1, [Define to 1 if you have 
__sync_val_compare_and_swap(int64 *, int64, int64).])
 fi])# PGAC_HAVE_GCC__SYNC_INT64_CAS
 
+# PGAC_HAVE_GCC__SYNC_INT128_CAS
+# -------------------------
+# Check if the C compiler understands __sync_compare_and_swap() for 128bit
+# types, and define HAVE_GCC__SYNC_INT128_CAS if so.
+AC_DEFUN([PGAC_HAVE_GCC__SYNC_INT128_CAS],
+[AC_CACHE_CHECK(for builtin __sync int128 atomic operations, 
pgac_cv_gcc_sync_int128_cas,
+[AC_LINK_IFELSE([AC_LANG_PROGRAM([],
+  [PG_INT128_TYPE lock = 0;
+   __sync_val_compare_and_swap(&lock, 0, (PG_INT128_TYPE) 37);])],
+  [pgac_cv_gcc_sync_int128_cas="yes"],
+  [pgac_cv_gcc_sync_int128_cas="no"])])
+if test x"$pgac_cv_gcc_sync_int128_cas" = x"yes"; then
+  AC_DEFINE(HAVE_GCC__SYNC_INT128_CAS, 1, [Define to 1 if you have 
__sync_val_compare_and_swap(int128 *, int128, int128).])
+fi])# PGAC_HAVE_GCC__SYNC_INT128_CAS
+
 # PGAC_HAVE_GCC__ATOMIC_INT32_CAS
 # -------------------------
 # Check if the C compiler understands __atomic_compare_exchange_n() for 32bit
@@ -638,6 +653,22 @@ if test x"$pgac_cv_gcc_atomic_int64_cas" = x"yes"; then
   AC_DEFINE(HAVE_GCC__ATOMIC_INT64_CAS, 1, [Define to 1 if you have 
__atomic_compare_exchange_n(int64 *, int64 *, int64).])
 fi])# PGAC_HAVE_GCC__ATOMIC_INT64_CAS
 
+# PGAC_HAVE_GCC__ATOMIC_INT128_CAS
+# -------------------------
+# Check if the C compiler understands __atomic_compare_exchange_n() for 128bit
+# types, and define HAVE_GCC__ATOMIC_INT128_CAS if so.
+AC_DEFUN([PGAC_HAVE_GCC__ATOMIC_INT128_CAS],
+[AC_CACHE_CHECK(for builtin __atomic int128 atomic operations, 
pgac_cv_gcc_atomic_int128_cas,
+[AC_LINK_IFELSE([AC_LANG_PROGRAM([],
+  [PG_INT128_TYPE val = 0;
+   PG_INT128_TYPE expect = 0;
+   __atomic_compare_exchange_n(&val, &expect, 37, 0, __ATOMIC_SEQ_CST, 
__ATOMIC_RELAXED);])],
+  [pgac_cv_gcc_atomic_int128_cas="yes"],
+  [pgac_cv_gcc_atomic_int128_cas="no"])])
+if test x"$pgac_cv_gcc_atomic_int128_cas" = x"yes"; then
+  AC_DEFINE(HAVE_GCC__ATOMIC_INT128_CAS, 1, [Define to 1 if you have 
__atomic_compare_exchange_n(int128 *, int128 *, int128).])
+fi])# PGAC_HAVE_GCC__ATOMIC_INT128_CAS
+
 # PGAC_SSE42_CRC32_INTRINSICS
 # -----------------------
 # Check if the compiler supports the x86 CRC instructions added in SSE 4.2,
diff --git a/configure b/configure
index 1bc465b942..e7bd1edc6a 100755
--- a/configure
+++ b/configure
@@ -17046,6 +17046,38 @@ if test x"$pgac_cv_gcc_sync_int64_cas" = x"yes"; then
 
 $as_echo "#define HAVE_GCC__SYNC_INT64_CAS 1" >>confdefs.h
 
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for builtin __sync int128 
atomic operations" >&5
+$as_echo_n "checking for builtin __sync int128 atomic operations... " >&6; }
+if ${pgac_cv_gcc_sync_int128_cas+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+int
+main ()
+{
+PG_INT128_TYPE lock = 0;
+   __sync_val_compare_and_swap(&lock, 0, (PG_INT128_TYPE) 37);
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  pgac_cv_gcc_sync_int128_cas="yes"
+else
+  pgac_cv_gcc_sync_int128_cas="no"
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $pgac_cv_gcc_sync_int128_cas" 
>&5
+$as_echo "$pgac_cv_gcc_sync_int128_cas" >&6; }
+if test x"$pgac_cv_gcc_sync_int128_cas" = x"yes"; then
+
+$as_echo "#define HAVE_GCC__SYNC_INT128_CAS 1" >>confdefs.h
+
 fi
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking for builtin __atomic int32 
atomic operations" >&5
 $as_echo_n "checking for builtin __atomic int32 atomic operations... " >&6; }
@@ -17112,6 +17144,39 @@ if test x"$pgac_cv_gcc_atomic_int64_cas" = x"yes"; then
 
 $as_echo "#define HAVE_GCC__ATOMIC_INT64_CAS 1" >>confdefs.h
 
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for builtin __atomic int128 
atomic operations" >&5
+$as_echo_n "checking for builtin __atomic int128 atomic operations... " >&6; }
+if ${pgac_cv_gcc_atomic_int128_cas+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+int
+main ()
+{
+PG_INT128_TYPE val = 0;
+   PG_INT128_TYPE expect = 0;
+   __atomic_compare_exchange_n(&val, &expect, 37, 0, __ATOMIC_SEQ_CST, 
__ATOMIC_RELAXED);
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  pgac_cv_gcc_atomic_int128_cas="yes"
+else
+  pgac_cv_gcc_atomic_int128_cas="no"
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: 
$pgac_cv_gcc_atomic_int128_cas" >&5
+$as_echo "$pgac_cv_gcc_atomic_int128_cas" >&6; }
+if test x"$pgac_cv_gcc_atomic_int128_cas" = x"yes"; then
+
+$as_echo "#define HAVE_GCC__ATOMIC_INT128_CAS 1" >>confdefs.h
+
 fi
 
 
diff --git a/configure.in b/configure.in
index a6b3b88cfa..24a841daa8 100644
--- a/configure.in
+++ b/configure.in
@@ -1946,8 +1946,10 @@ PGAC_HAVE_GCC__SYNC_CHAR_TAS
 PGAC_HAVE_GCC__SYNC_INT32_TAS
 PGAC_HAVE_GCC__SYNC_INT32_CAS
 PGAC_HAVE_GCC__SYNC_INT64_CAS
+PGAC_HAVE_GCC__SYNC_INT128_CAS
 PGAC_HAVE_GCC__ATOMIC_INT32_CAS
 PGAC_HAVE_GCC__ATOMIC_INT64_CAS
+PGAC_HAVE_GCC__ATOMIC_INT128_CAS
 
 
 # Check for x86 cpuid instruction
diff --git a/src/backend/port/atomics.c b/src/backend/port/atomics.c
index caa84bf2b6..ee73d8ea0a 100644
--- a/src/backend/port/atomics.c
+++ b/src/backend/port/atomics.c
@@ -237,3 +237,54 @@ pg_atomic_fetch_add_u64_impl(volatile pg_atomic_uint64 
*ptr, int64 add_)
 }
 
 #endif                                                 /* 
PG_HAVE_ATOMIC_U64_SIMULATION */
+
+
+#ifdef PG_HAVE_ATOMIC_U128_SIMULATION
+
+void
+pg_atomic_init_u128_impl(volatile pg_atomic_uint128 *ptr, uint128 val_)
+{
+       StaticAssertStmt(sizeof(ptr->sema) >= sizeof(slock_t),
+                                        "size mismatch of atomic_uint128 vs 
slock_t");
+
+       /*
+        * If we're using semaphore based atomic flags, be careful about nested
+        * usage of atomics while a spinlock is held.
+        */
+#ifndef HAVE_SPINLOCKS
+       s_init_lock_sema((slock_t *) &ptr->sema, true);
+#else
+       SpinLockInit((slock_t *) &ptr->sema);
+#endif
+       ptr->value = val_;
+}
+
+bool
+pg_atomic_compare_exchange_u128_impl(volatile pg_atomic_uint128 *ptr,
+                                                                       uint128 
*expected, uint128 newval)
+{
+       bool            ret;
+
+       /*
+        * Do atomic op under a spinlock. It might look like we could just skip
+        * the cmpxchg if the lock isn't available, but that'd just emulate a
+        * 'weak' compare and swap. I.e. one that allows spurious failures. 
Since
+        * several algorithms rely on a strong variant and that is efficiently
+        * implementable on most major architectures let's emulate it here as
+        * well.
+        */
+       SpinLockAcquire((slock_t *) &ptr->sema);
+
+       /* perform compare/exchange logic */
+       ret = ptr->value == *expected;
+       *expected = ptr->value;
+       if (ret)
+               ptr->value = newval;
+
+       /* and release lock */
+       SpinLockRelease((slock_t *) &ptr->sema);
+
+       return ret;
+}
+
+#endif                                                 /* 
PG_HAVE_ATOMIC_U128_SIMULATION */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index dcb25bb723..9a745cdf6c 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -225,6 +225,10 @@
 /* Define to 1 if your compiler understands __FUNCTION__. */
 #undef HAVE_FUNCNAME__FUNCTION
 
+/* Define to 1 if you have __atomic_compare_exchange_n(int128 *, int128 *,
+   int128). */
+#undef HAVE_GCC__ATOMIC_INT128_CAS
+
 /* Define to 1 if you have __atomic_compare_exchange_n(int *, int *, int). */
 #undef HAVE_GCC__ATOMIC_INT32_CAS
 
@@ -235,6 +239,10 @@
 /* Define to 1 if you have __sync_lock_test_and_set(char *) and friends. */
 #undef HAVE_GCC__SYNC_CHAR_TAS
 
+/* Define to 1 if you have __sync_val_compare_and_swap(int128 *, int128,
+   int128). */
+#undef HAVE_GCC__SYNC_INT128_CAS
+
 /* Define to 1 if you have __sync_val_compare_and_swap(int *, int, int). */
 #undef HAVE_GCC__SYNC_INT32_CAS
 
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index 0470391675..6cb2fddb70 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -522,6 +522,55 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, 
int64 sub_)
        return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/* ----
+ * The 128 bit operations have the same semantics as their 32bit counterparts
+ * if they are available. Check the corresponding 32bit function for
+ * documentation.
+ * ----
+ */
+static inline void
+pg_atomic_init_u128(volatile pg_atomic_uint128 *ptr, uint128 val)
+{
+       /*
+        * Can't necessarily enforce alignment - and don't need it - when using
+        * the spinlock based fallback implementation. Therefore only assert 
when
+        * not using it.
+        */
+#ifndef PG_HAVE_ATOMIC_U128_SIMULATION
+       AssertPointerAlignment(ptr, 16);
+#endif
+       pg_atomic_init_u128_impl(ptr, val);
+}
+
+static inline uint128
+pg_atomic_read_u128(volatile pg_atomic_uint128 *ptr)
+{
+#ifndef PG_HAVE_ATOMIC_U128_SIMULATION
+       AssertPointerAlignment(ptr, 16);
+#endif
+       return pg_atomic_read_u128_impl(ptr);
+}
+
+static inline void
+pg_atomic_write_u128(volatile pg_atomic_uint128 *ptr, uint128 val)
+{
+#ifndef PG_HAVE_ATOMIC_U128_SIMULATION
+       AssertPointerAlignment(ptr, 16);
+#endif
+       pg_atomic_write_u128_impl(ptr, val);
+}
+
+static inline bool
+pg_atomic_compare_exchange_u128(volatile pg_atomic_uint128 *ptr,
+                                                               uint128 
*expected, uint128 newval)
+{
+#ifndef PG_HAVE_ATOMIC_U128_SIMULATION
+       AssertPointerAlignment(ptr, 16);
+       AssertPointerAlignment(expected, 16);
+#endif
+       return pg_atomic_compare_exchange_u128_impl(ptr, expected, newval);
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif                                                 /* ATOMICS_H */
diff --git a/src/include/port/atomics/fallback.h 
b/src/include/port/atomics/fallback.h
index 88a967ad5b..9d0158e6da 100644
--- a/src/include/port/atomics/fallback.h
+++ b/src/include/port/atomics/fallback.h
@@ -121,6 +121,24 @@ typedef struct pg_atomic_uint64
 
 #endif /* PG_HAVE_ATOMIC_U64_SUPPORT */
 
+#if !defined(PG_HAVE_ATOMIC_U128_SUPPORT)
+
+#define PG_HAVE_ATOMIC_U128_SIMULATION
+
+#define PG_HAVE_ATOMIC_U128_SUPPORT
+typedef struct pg_atomic_uint128
+{
+       /* Check pg_atomic_flag's definition above for an explanation */
+#if defined(__hppa) || defined(__hppa__)       /* HP PA-RISC, GCC and HP 
compilers */
+       int                     sema[4];
+#else
+       int                     sema;
+#endif
+       volatile uint128 value;
+} pg_atomic_uint128;
+
+#endif /* PG_HAVE_ATOMIC_U128_SUPPORT */
+
 #ifdef PG_HAVE_ATOMIC_FLAG_SIMULATION
 
 #define PG_HAVE_ATOMIC_INIT_FLAG
@@ -168,3 +186,15 @@ extern bool pg_atomic_compare_exchange_u64_impl(volatile 
pg_atomic_uint64 *ptr,
 extern uint64 pg_atomic_fetch_add_u64_impl(volatile pg_atomic_uint64 *ptr, 
int64 add_);
 
 #endif /* PG_HAVE_ATOMIC_U64_SIMULATION */
+
+
+#ifdef PG_HAVE_ATOMIC_U128_SIMULATION
+
+#define PG_HAVE_ATOMIC_INIT_U128
+extern void pg_atomic_init_u128_impl(volatile pg_atomic_uint128 *ptr, uint128 
val_);
+
+#define PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U128
+extern bool pg_atomic_compare_exchange_u128_impl(volatile pg_atomic_uint128 
*ptr,
+                                                                               
                 uint128 *expected, uint128 newval);
+
+#endif /* PG_HAVE_ATOMIC_U128_SIMULATION */
diff --git a/src/include/port/atomics/generic-gcc.h 
b/src/include/port/atomics/generic-gcc.h
index 3a1dc88377..b0818dd644 100644
--- a/src/include/port/atomics/generic-gcc.h
+++ b/src/include/port/atomics/generic-gcc.h
@@ -102,6 +102,20 @@ typedef struct pg_atomic_uint64
 
 #endif /* defined(HAVE_GCC__ATOMIC_INT64_CAS) || 
defined(HAVE_GCC__SYNC_INT64_CAS) */
 
+/* generic gcc based atomic uint128 implementation */
+#if !defined(PG_HAVE_ATOMIC_U128_SUPPORT) \
+       && !defined(PG_DISABLE_128_BIT_ATOMICS) \
+       && (defined(HAVE_GCC__ATOMIC_INT128_CAS) || 
defined(HAVE_GCC__SYNC_INT128_CAS))
+
+#define PG_HAVE_ATOMIC_U128_SUPPORT
+
+typedef struct pg_atomic_uint128
+{
+       volatile uint128 value pg_attribute_aligned(8);
+} pg_atomic_uint128;
+
+#endif /* defined(HAVE_GCC__ATOMIC_INT128_CAS) || 
defined(HAVE_GCC__SYNC_INT128_CAS) */
+
 #ifdef PG_HAVE_ATOMIC_FLAG_SUPPORT
 
 #if defined(HAVE_GCC__SYNC_CHAR_TAS) || defined(HAVE_GCC__SYNC_INT32_TAS)
@@ -283,4 +297,36 @@ pg_atomic_fetch_or_u64_impl(volatile pg_atomic_uint64 
*ptr, uint64 or_)
 
 #endif /* !defined(PG_DISABLE_64_BIT_ATOMICS) */
 
+
+#if !defined(PG_DISABLE_128_BIT_ATOMICS)
+
+#if !defined(PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U128) && 
defined(HAVE_GCC__ATOMIC_INT128_CAS)
+#define PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U128
+static inline bool
+pg_atomic_compare_exchange_u128_impl(volatile pg_atomic_uint128 *ptr,
+                                                                       uint128 
*expected, uint128 newval)
+{
+       return __atomic_compare_exchange_n(&ptr->value, expected, newval, false,
+                                                                          
__ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
+}
+#endif
+
+#if !defined(PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U128) && 
defined(HAVE_GCC__SYNC_INT128_CAS)
+#define PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U128
+static inline bool
+pg_atomic_compare_exchange_u128_impl(volatile pg_atomic_uint128 *ptr,
+                                                                       uint128 
*expected, uint128 newval)
+{
+       bool    ret;
+       uint128 current;
+       current = __sync_val_compare_and_swap(&ptr->value, *expected, newval);
+       ret = current == *expected;
+       *expected = current;
+       return ret;
+}
+#endif
+
+#endif /* !defined(PG_DISABLE_128_BIT_ATOMICS) */
+
+
 #endif /* defined(HAVE_ATOMICS) */
diff --git a/src/include/port/atomics/generic.h 
b/src/include/port/atomics/generic.h
index ea11698a35..0d3ea2e3aa 100644
--- a/src/include/port/atomics/generic.h
+++ b/src/include/port/atomics/generic.h
@@ -399,3 +399,94 @@ pg_atomic_sub_fetch_u64_impl(volatile pg_atomic_uint64 
*ptr, int64 sub_)
        return pg_atomic_fetch_sub_u64_impl(ptr, sub_) - sub_;
 }
 #endif
+
+#if !defined(PG_HAVE_ATOMIC_EXCHANGE_U128) && 
defined(PG_HAVE_ATOMIC_COMPARE_EXCHANGE_U128)
+#define PG_HAVE_ATOMIC_EXCHANGE_U128
+static inline uint128
+pg_atomic_exchange_u128_impl(volatile pg_atomic_uint128 *ptr, uint128 xchg_)
+{
+       uint128 old;
+       old = ptr->value;                       /* ok if read is not atomic */
+       while (!pg_atomic_compare_exchange_u128_impl(ptr, &old, xchg_))
+               /* skip */;
+       return old;
+}
+#endif
+
+#ifndef PG_HAVE_ATOMIC_WRITE_U128
+#define PG_HAVE_ATOMIC_WRITE_U128
+
+#if defined(PG_HAVE_16BYTE_SINGLE_COPY_ATOMICITY) && \
+       !defined(PG_HAVE_ATOMIC_U128_SIMULATION)
+
+static inline void
+pg_atomic_write_u128_impl(volatile pg_atomic_uint128 *ptr, uint128 val)
+{
+       /*
+        * On this platform aligned 128bit writes are guaranteed to be atomic,
+        * except if using the fallback implementation, where can't guarantee 
the
+        * required alignment.
+        */
+       AssertPointerAlignment(ptr, 16);
+       ptr->value = val;
+}
+
+#else
+
+static inline void
+pg_atomic_write_u128_impl(volatile pg_atomic_uint128 *ptr, uint128 val)
+{
+       /*
+        * 128 bit writes aren't safe on all platforms. In the generic
+        * implementation implement them as an atomic exchange.
+        */
+       pg_atomic_exchange_u128_impl(ptr, val);
+}
+
+#endif /* PG_HAVE_16BYTE_SINGLE_COPY_ATOMICITY && 
!PG_HAVE_ATOMIC_U128_SIMULATION */
+#endif /* PG_HAVE_ATOMIC_WRITE_U128 */
+
+#ifndef PG_HAVE_ATOMIC_READ_U128
+#define PG_HAVE_ATOMIC_READ_U128
+
+#if defined(PG_HAVE_16BYTE_SINGLE_COPY_ATOMICITY) && \
+       !defined(PG_HAVE_ATOMIC_U128_SIMULATION)
+
+static inline uint128
+pg_atomic_read_u128_impl(volatile pg_atomic_uint128 *ptr)
+{
+       /*
+        * On this platform aligned 128-bit reads are guaranteed to be atomic.
+        */
+       AssertPointerAlignment(ptr, 16);
+       return ptr->value;
+}
+
+#else
+
+static inline uint128
+pg_atomic_read_u128_impl(volatile pg_atomic_uint128 *ptr)
+{
+       uint128 old = 0;
+
+       /*
+        * 128-bit reads aren't atomic on all platforms. In the generic
+        * implementation implement them as a compare/exchange with 0. That'll
+        * fail or succeed, but always return the old value. Possibly might 
store
+        * a 0, but only if the previous value also was a 0 - i.e. harmless.
+        */
+       pg_atomic_compare_exchange_u128_impl(ptr, &old, 0);
+
+       return old;
+}
+#endif /* PG_HAVE_16BYTE_SINGLE_COPY_ATOMICITY && 
!PG_HAVE_ATOMIC_U128_SIMULATION */
+#endif /* PG_HAVE_ATOMIC_READ_U64 */
+
+#ifndef PG_HAVE_ATOMIC_INIT_U128
+#define PG_HAVE_ATOMIC_INIT_U128
+static inline void
+pg_atomic_init_u128_impl(volatile pg_atomic_uint128 *ptr, uint128 val_)
+{
+       pg_atomic_write_u128_impl(ptr, val_);
+}
+#endif

base-commit: 1486f7f981c0052988891677d4e734b14317816c
-- 
2.18.0

From 02aa75ac38199bda442ddbaea7ce4d1380ece4f9 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Thu, 5 Jul 2018 00:40:39 +0200
Subject: [PATCH 2/2] Reduce WAL spinlock contention

Replace insertpos_lck with 128-bit compare-and-swap operation.
---
 src/backend/access/transam/xlog.c | 137 ++++++++++++++++--------------
 1 file changed, 75 insertions(+), 62 deletions(-)

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 022dac6cd7..07bc90647f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -511,13 +511,24 @@ typedef enum ExclusiveBackupState
  */
 static SessionBackupState sessionBackupState = SESSION_BACKUP_NONE;
 
+/*
+ * Union for accessing a pair of 64-bit integers as a 128-bit integer.
+ */
+typedef union
+{
+       struct
+       {
+               uint64  current;
+               uint64  previous;
+       } parts;
+       uint128 whole;
+} BytePositions;
+
 /*
  * Shared state data for WAL insertion.
  */
 typedef struct XLogCtlInsert
 {
-       slock_t         insertpos_lck;  /* protects CurrBytePos and PrevBytePos 
*/
-
        /*
         * CurrBytePos is the end of reserved WAL. The next record will be
         * inserted at that position. PrevBytePos is the start position of the
@@ -525,8 +536,7 @@ typedef struct XLogCtlInsert
         * prev-link of the next record. These are stored as "usable byte
         * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
         */
-       uint64          CurrBytePos;
-       uint64          PrevBytePos;
+       pg_atomic_uint128       BytePos;
 
        /*
         * Make sure the above heavily-contended spinlock and byte positions are
@@ -992,7 +1002,7 @@ XLogInsertRecord(XLogRecData *rdata,
         *
         * 1. Reserve the right amount of space from the WAL. The current head 
of
         *        reserved space is kept in Insert->CurrBytePos, and is 
protected by
-        *        insertpos_lck.
+        *        atomic operations.
         *
         * 2. Copy the record to the reserved WAL space. This involves finding 
the
         *        correct WAL buffer containing the reserved space, and copying 
the
@@ -1224,8 +1234,7 @@ XLogInsertRecord(XLogRecData *rdata,
  *
  * This is the performance critical part of XLogInsert that must be serialized
  * across backends. The rest can happen mostly in parallel. Try to keep this
- * section as short as possible, insertpos_lck can be heavily contended on a
- * busy system.
+ * section as short as possible.
  *
  * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
  * where we actually copy the record to the reserved space.
@@ -1238,6 +1247,8 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, 
XLogRecPtr *EndPos,
        uint64          startbytepos;
        uint64          endbytepos;
        uint64          prevbytepos;
+       BytePositions oldpositions;
+       BytePositions newpositions;
 
        size = MAXALIGN(size);
 
@@ -1245,24 +1256,22 @@ ReserveXLogInsertLocation(int size, XLogRecPtr 
*StartPos, XLogRecPtr *EndPos,
        Assert(size > SizeOfXLogRecord);
 
        /*
-        * The duration the spinlock needs to be held is minimized by minimizing
-        * the calculations that have to be done while holding the lock. The
-        * current tip of reserved WAL is kept in CurrBytePos, as a byte 
position
+        * The current tip of reserved WAL is kept in CurrBytePos, as a byte 
position
         * that only counts "usable" bytes in WAL, that is, it excludes all WAL
         * page headers. The mapping between "usable" byte positions and 
physical
         * positions (XLogRecPtrs) can be done outside the locked region, and
         * because the usable byte position doesn't include any headers, 
reserving
         * X bytes from WAL is almost as simple as "CurrBytePos += X".
         */
-       SpinLockAcquire(&Insert->insertpos_lck);
-
-       startbytepos = Insert->CurrBytePos;
-       endbytepos = startbytepos + size;
-       prevbytepos = Insert->PrevBytePos;
-       Insert->CurrBytePos = endbytepos;
-       Insert->PrevBytePos = startbytepos;
-
-       SpinLockRelease(&Insert->insertpos_lck);
+       do {
+               oldpositions.whole = Insert->BytePos.value;
+               startbytepos = oldpositions.parts.current;
+               endbytepos = startbytepos + size;
+               prevbytepos = oldpositions.parts.previous;
+               newpositions.parts.current = endbytepos;
+               newpositions.parts.previous = startbytepos;
+       }
+       while (!pg_atomic_compare_exchange_u128(&Insert->BytePos, 
&oldpositions.whole, newpositions.whole));
 
        *StartPos = XLogBytePosToRecPtr(startbytepos);
        *EndPos = XLogBytePosToEndRecPtr(endbytepos);
@@ -1293,45 +1302,42 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr 
*EndPos, XLogRecPtr *PrevPtr)
        uint64          startbytepos;
        uint64          endbytepos;
        uint64          prevbytepos;
+       BytePositions oldpositions;
+       BytePositions newpositions;
        uint32          size = MAXALIGN(SizeOfXLogRecord);
        XLogRecPtr      ptr;
        uint32          segleft;
 
-       /*
-        * These calculations are a bit heavy-weight to be done while holding a
-        * spinlock, but since we're holding all the WAL insertion locks, there
-        * are no other inserters competing for it. GetXLogInsertRecPtr() does
-        * compete for it, but that's not called very frequently.
-        */
-       SpinLockAcquire(&Insert->insertpos_lck);
+       do
+       {
+               oldpositions.whole = Insert->BytePos.value;
+               startbytepos = oldpositions.parts.current;
 
-       startbytepos = Insert->CurrBytePos;
+               ptr = XLogBytePosToEndRecPtr(startbytepos);
+               if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
+               {
+                       *EndPos = *StartPos = ptr;
+                       return false;
+               }
 
-       ptr = XLogBytePosToEndRecPtr(startbytepos);
-       if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
-       {
-               SpinLockRelease(&Insert->insertpos_lck);
-               *EndPos = *StartPos = ptr;
-               return false;
-       }
+               endbytepos = startbytepos + size;
+               prevbytepos = oldpositions.parts.previous;
 
-       endbytepos = startbytepos + size;
-       prevbytepos = Insert->PrevBytePos;
+               *StartPos = XLogBytePosToRecPtr(startbytepos);
+               *EndPos = XLogBytePosToEndRecPtr(endbytepos);
 
-       *StartPos = XLogBytePosToRecPtr(startbytepos);
-       *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+               segleft = wal_segment_size - XLogSegmentOffset(*EndPos, 
wal_segment_size);
+               if (segleft != wal_segment_size)
+               {
+                       /* consume the rest of the segment */
+                       *EndPos += segleft;
+                       endbytepos = XLogRecPtrToBytePos(*EndPos);
+               }
 
-       segleft = wal_segment_size - XLogSegmentOffset(*EndPos, 
wal_segment_size);
-       if (segleft != wal_segment_size)
-       {
-               /* consume the rest of the segment */
-               *EndPos += segleft;
-               endbytepos = XLogRecPtrToBytePos(*EndPos);
+               newpositions.parts.current = endbytepos;
+               newpositions.parts.previous = startbytepos;
        }
-       Insert->CurrBytePos = endbytepos;
-       Insert->PrevBytePos = startbytepos;
-
-       SpinLockRelease(&Insert->insertpos_lck);
+       while (!pg_atomic_compare_exchange_u128(&Insert->BytePos, 
&oldpositions.whole, newpositions.whole));
 
        *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
@@ -1735,7 +1741,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
 static XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
-       uint64          bytepos;
+       BytePositions bytepositions;
        XLogRecPtr      reservedUpto;
        XLogRecPtr      finishedUpto;
        XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1745,10 +1751,8 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
                elog(PANIC, "cannot wait without a PGPROC structure");
 
        /* Read the current insert position */
-       SpinLockAcquire(&Insert->insertpos_lck);
-       bytepos = Insert->CurrBytePos;
-       SpinLockRelease(&Insert->insertpos_lck);
-       reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+       bytepositions.whole = pg_atomic_read_u128(&Insert->BytePos);
+       reservedUpto = XLogBytePosToEndRecPtr(bytepositions.parts.current);
 
        /*
         * No-one should request to flush a piece of WAL that hasn't even been
@@ -5030,9 +5034,9 @@ XLOGShmemInit(void)
        XLogCtl->SharedHotStandbyActive = false;
        XLogCtl->WalWriterSleeping = false;
 
-       SpinLockInit(&XLogCtl->Insert.insertpos_lck);
        SpinLockInit(&XLogCtl->info_lck);
        SpinLockInit(&XLogCtl->ulsn_lck);
+       pg_atomic_init_u128(&XLogCtl->Insert.BytePos, 0);
        InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
 }
 
@@ -7587,8 +7591,14 @@ StartupXLOG(void)
         * previous incarnation.
         */
        Insert = &XLogCtl->Insert;
-       Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
-       Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
+
+       {
+               BytePositions positions;
+
+               positions.parts.previous = XLogRecPtrToBytePos(LastRec);
+               positions.parts.current = XLogRecPtrToBytePos(EndOfLog);
+               pg_atomic_write_u128(&Insert->BytePos, positions.whole);
+       }
 
        /*
         * Tricky point here: readBuf contains the *last* block that the LastRec
@@ -8734,7 +8744,12 @@ CreateCheckPoint(int flags)
         * determine the checkpoint REDO pointer.
         */
        WALInsertLockAcquireExclusive();
-       curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
+       {
+               BytePositions positions;
+
+               positions.whole = Insert->BytePos.value;
+               curInsert = XLogBytePosToRecPtr(positions.parts.current);
+       }
 
        /*
         * If this isn't a shutdown or forced checkpoint, and if there has been 
no
@@ -11270,13 +11285,11 @@ XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
        XLogCtlInsert *Insert = &XLogCtl->Insert;
-       uint64          current_bytepos;
+       BytePositions positions;
 
-       SpinLockAcquire(&Insert->insertpos_lck);
-       current_bytepos = Insert->CurrBytePos;
-       SpinLockRelease(&Insert->insertpos_lck);
+       positions.whole = pg_atomic_read_u128(&Insert->BytePos);
 
-       return XLogBytePosToRecPtr(current_bytepos);
+       return XLogBytePosToRecPtr(positions.parts.current);
 }
 
 /*
-- 
2.18.0

\set aid random(1, 100000 * :scale)
\set bid random(1, 1 * :scale)
\set tid random(1, 10 * :scale)
\set delta random(-5000, 5000)

BEGIN;
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, 
:aid, :delta, CURRENT_TIMESTAMP);
END;

Reply via email to