Hi,

I noticed a "XXX: It might be worth considering using an atomic fetch-and-add
instruction here, on architectures where that is supported." in lock.c

Here is my first try at using it. The patch adds a configure check for
gcc 4.7 __atomic_add_fetch as well as the older __sync_add_and_fetch built-ins.
If either is available they are used instead of the mutex.

Changes do the following:
- declare atomic_inc and atomic_dec inline functions (implemented either using
  GCC built-in functions or the old mutex code) in new atomics.h
- change lock.c to use atomic_* functions instead of explicit mutex
- moved all other assignments inside the mutex to occur before the atomic
  operation so that the barrier of the atomic operation can guarantee the
  stores are visible when the function ends
- removed one assert that could not easily be implemented with atomic_dec
  in RemoveLocalLock


Using method AbortStrongLockAcquire as an example.
When compiling with Fedora GCC 4.7.2 the following assembly code is generated

Original code before the patch: 136 bytes
Mutex code with the patch: 124 bytes
Code with gcc-built-ins: 56 bytes

I think moving the extra assignments outside the mutex has allowed the
compiler to optimize the code more even when the mutex is used.


Questions:
1) is it safe to move the assignments to locallock->holdsStrongLockCount
   and StrongLockInProgress outside the FastPathStrongRelationLocks?
2) With built-ins the FastPathStrongRelationLockData becomes uint32[1024],
   should we add back some padding to reduce the cacheline collisions?
   For a modern cpu using 64 byte cache lines there can be only
   max 16 concurrent updates at the and the propability of collision
   is quite big
3) What kind of pgbench test would utilise the code path the most?

TODO:
1) add check in configure.in to ensure that the built-ins are not converted
   to external calls by gcc (on architectures that use the gcc generic version)
2) other architectures / compilers


------

Original code before the patch:

0000000000000e10 <AbortStrongLockAcquire>:
     e10:       48 89 5c 24 f0          mov    %rbx,-0x10(%rsp)
     e15:       48 89 6c 24 f8          mov    %rbp,-0x8(%rsp)
     e1a:       48 83 ec 18             sub    $0x18,%rsp
     e1e:       48 8b 1d 00 00 00 00    mov    0x0(%rip),%rbx        # e25 
<AbortStrongLockAcquire+0x15>
     e25:       48 85 db                test   %rbx,%rbx
     e28:       74 41                   je     e6b <AbortStrongLockAcquire+0x5b>
     e2a:       8b 6b 28                mov    0x28(%rbx),%ebp
     e2d:       b8 01 00 00 00          mov    $0x1,%eax
     e32:       48 8b 15 00 00 00 00    mov    0x0(%rip),%rdx        # e39 
<AbortStrongLockAcquire+0x29>
     e39:       81 e5 ff 03 00 00       and    $0x3ff,%ebp
     e3f:       f0 86 02                lock xchg %al,(%rdx)
     e42:       84 c0                   test   %al,%al
     e44:       75 3a                   jne    e80 <AbortStrongLockAcquire+0x70>
     e46:       48 8b 05 00 00 00 00    mov    0x0(%rip),%rax        # e4d 
<AbortStrongLockAcquire+0x3d>
     e4d:       48 c7 05 00 00 00 00    movq   $0x0,0x0(%rip)        # e58 
<AbortStrongLockAcquire+0x48>
     e54:       00 00 00 00
     e58:       83 6c a8 04 01          subl   $0x1,0x4(%rax,%rbp,4)
     e5d:       c6 43 40 00             movb   $0x0,0x40(%rbx)
     e61:       48 8b 05 00 00 00 00    mov    0x0(%rip),%rax        # e68 
<AbortStrongLockAcquire+0x58>
     e68:       c6 00 00                movb   $0x0,(%rax)
     e6b:       48 8b 5c 24 08          mov    0x8(%rsp),%rbx
     e70:       48 8b 6c 24 10          mov    0x10(%rsp),%rbp
     e75:       48 83 c4 18             add    $0x18,%rsp
     e79:       c3                      retq
     e7a:       66 0f 1f 44 00 00       nopw   0x0(%rax,%rax,1)
     e80:       48 8b 3d 00 00 00 00    mov    0x0(%rip),%rdi        # e87 
<AbortStrongLockAcquire+0x77>
     e87:       ba a8 05 00 00          mov    $0x5a8,%edx
     e8c:       be 00 00 00 00          mov    $0x0,%esi
     e91:       e8 00 00 00 00          callq  e96 <AbortStrongLockAcquire+0x86>
     e96:       eb ae                   jmp    e46 <AbortStrongLockAcquire+0x36>
     e98:       0f 1f 84 00 00 00 00    nopl   0x0(%rax,%rax,1)
     e9f:       00

Mutex code with the patch:
0000000000000e00 <AbortStrongLockAcquire>:
     e00:       48 8b 05 00 00 00 00    mov    0x0(%rip),%rax        # e07 
<AbortStrongLockAcquire+0x7>
     e07:       48 85 c0                test   %rax,%rax
     e0a:       74 55                   je     e61 <AbortStrongLockAcquire+0x61>
     e0c:       48 89 5c 24 f0          mov    %rbx,-0x10(%rsp)
     e11:       48 89 6c 24 f8          mov    %rbp,-0x8(%rsp)
     e16:       48 83 ec 18             sub    $0x18,%rsp
     e1a:       8b 68 28                mov    0x28(%rax),%ebp
     e1d:       c6 40 40 00             movb   $0x0,0x40(%rax)
     e21:       b8 01 00 00 00          mov    $0x1,%eax
     e26:       48 c7 05 00 00 00 00    movq   $0x0,0x0(%rip)        # e31 
<AbortStrongLockAcquire+0x31>
     e2d:       00 00 00 00
     e31:       48 8b 1d 00 00 00 00    mov    0x0(%rip),%rbx        # e38 
<AbortStrongLockAcquire+0x38>
     e38:       81 e5 ff 03 00 00       and    $0x3ff,%ebp
     e3e:       f0 86 03                lock xchg %al,(%rbx)
     e41:       84 c0                   test   %al,%al
     e43:       75 23                   jne    e68 <AbortStrongLockAcquire+0x68>
     e45:       8b 44 ab 04             mov    0x4(%rbx,%rbp,4),%eax
     e49:       83 e8 01                sub    $0x1,%eax
     e4c:       89 44 ab 04             mov    %eax,0x4(%rbx,%rbp,4)
     e50:       c6 03 00                movb   $0x0,(%rbx)
     e53:       48 8b 5c 24 08          mov    0x8(%rsp),%rbx
     e58:       48 8b 6c 24 10          mov    0x10(%rsp),%rbp
     e5d:       48 83 c4 18             add    $0x18,%rsp
     e61:       f3 c3                   repz retq
     e63:       0f 1f 44 00 00          nopl   0x0(%rax,%rax,1)
     e68:       ba 41 00 00 00          mov    $0x41,%edx
     e6d:       be 00 00 00 00          mov    $0x0,%esi
     e72:       48 89 df                mov    %rbx,%rdi
     e75:       e8 00 00 00 00          callq  e7a <AbortStrongLockAcquire+0x7a>
     e7a:       eb c9                   jmp    e45 <AbortStrongLockAcquire+0x45>
     e7c:       0f 1f 40 00             nopl   0x0(%rax)

Code with gcc-built-ins:
0000000000000da0 <AbortStrongLockAcquire>:
     da0:       48 8b 05 00 00 00 00    mov    0x0(%rip),%rax        # da7 
<AbortStrongLockAcquire+0x7>
     da7:       48 85 c0                test   %rax,%rax
     daa:       74 2a                   je     dd6 <AbortStrongLockAcquire+0x36>
     dac:       8b 50 28                mov    0x28(%rax),%edx
     daf:       c6 40 40 00             movb   $0x0,0x40(%rax)
     db3:       48 c7 05 00 00 00 00    movq   $0x0,0x0(%rip)        # dbe 
<AbortStrongLockAcquire+0x1e>
     dba:       00 00 00 00
     dbe:       48 89 d0                mov    %rdx,%rax
     dc1:       48 8b 15 00 00 00 00    mov    0x0(%rip),%rdx        # dc8 
<AbortStrongLockAcquire+0x28>
     dc8:       25 ff 03 00 00          and    $0x3ff,%eax
     dcd:       48 c1 e0 02             shl    $0x2,%rax
     dd1:       f0 83 2c 02 01          lock subl $0x1,(%rdx,%rax,1)
     dd6:       f3 c3                   repz retq
     dd8:       0f 1f 84 00 00 00 00    nopl   0x0(%rax,%rax,1)
     ddf:       00

>From 958b04eb08603167dee2fe6684f9430f5b578f28 Mon Sep 17 00:00:00 2001
From: Mikko Tiihonen <mikko.tiiho...@nitor.fi>
Date: Wed, 12 Dec 2012 20:02:49 +0200
Subject: [PATCH] Use gcc built-in atomic add/sub instructions, if available


diff --git a/configure.in b/configure.in
index 2dee4b3..dec2785 100644
--- a/configure.in
+++ b/configure.in
@@ -1451,6 +1451,28 @@ if test x"$pgac_cv_gcc_int_atomics" = x"yes"; then
   AC_DEFINE(HAVE_GCC_INT_ATOMICS, 1, [Define to 1 if you have __sync_lock_test_and_set(int *) and friends.])
 fi
 
+#TODO, check for __atomic_is_lock_free (sizeof(int), 0) too
+
+AC_CACHE_CHECK([for builtin atomic functions], pgac_cv_gcc_int_atomic_add,
+[AC_TRY_LINK([],
+  [int counter = 0;
+   __atomic_add_fetch(&counter, 1, __ATOMIC_SEQ_CST);],
+  [pgac_cv_gcc_int_atomic_add="yes"],
+  [pgac_cv_gcc_int_atomic_add="no"])])
+if test x"$pgac_cv_gcc_int_atomic_add" = x"yes"; then
+  AC_DEFINE(HAVE_GCC_INT_ATOMIC_ADD, 1, [Define to 1 if you have __atomic_add_fetch(int *, int, int) and friends.])
+fi
+
+AC_CACHE_CHECK([for builtin sync functions], pgac_cv_gcc_int_sync_add,
+[AC_TRY_LINK([],
+  [int counter = 0;
+   __sync_add_and_fetch(&counter, 1);],
+  [pgac_cv_gcc_int_sync_add="yes"],
+  [pgac_cv_gcc_int_sync_add="no"])])
+if test x"$pgac_cv_gcc_int_sync_add" = x"yes"; then
+  AC_DEFINE(HAVE_GCC_INT_SYNC_ADD, 1, [Define to 1 if you have  __sync_add_and_fetch(int *, int) and friends.])
+fi
+
 
 #
 # Pthreads
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index ec4da20..c8c0b91 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -40,7 +40,7 @@
 #include "pgstat.h"
 #include "storage/proc.h"
 #include "storage/sinvaladt.h"
-#include "storage/spin.h"
+#include "storage/atomics.h"
 #include "storage/standby.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -234,7 +234,12 @@ static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
 
 typedef struct
 {
+#ifdef MUTEXLESS_ATOMIC_INC
+#define FAST_PATH_MUTEX(data) NULL
+#else
+#define FAST_PATH_MUTEX(data) &(data)->mutex
 	slock_t		mutex;
+#endif
 	uint32		count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
 } FastPathStrongRelationLockData;
 
@@ -427,8 +432,10 @@ InitLocks(void)
 	FastPathStrongRelationLocks =
 		ShmemInitStruct("Fast Path Strong Relation Lock Data",
 						sizeof(FastPathStrongRelationLockData), &found);
+#ifndef MUTEXLESS_ATOMIC_INC
 	if (!found)
 		SpinLockInit(&FastPathStrongRelationLocks->mutex);
+#endif
 
 	/*
 	 * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
@@ -1207,11 +1214,8 @@ RemoveLocalLock(LOCALLOCK *locallock)
 
 		fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
 
-		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
-		Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
-		FastPathStrongRelationLocks->count[fasthashcode]--;
 		locallock->holdsStrongLockCount = FALSE;
-		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+		atomic_dec(&FastPathStrongRelationLocks->count[fasthashcode], FAST_PATH_MUTEX(FastPathStrongRelationLocks));
 	}
 
 	if (!hash_search(LockMethodLocalHash,
@@ -1475,16 +1479,10 @@ BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
 	 * Adding to a memory location is not atomic, so we take a spinlock to
 	 * ensure we don't collide with someone else trying to bump the count at
 	 * the same time.
-	 *
-	 * XXX: It might be worth considering using an atomic fetch-and-add
-	 * instruction here, on architectures where that is supported.
 	 */
-
-	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
-	FastPathStrongRelationLocks->count[fasthashcode]++;
 	locallock->holdsStrongLockCount = TRUE;
 	StrongLockInProgress = locallock;
-	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+	atomic_inc(&FastPathStrongRelationLocks->count[fasthashcode], FAST_PATH_MUTEX(FastPathStrongRelationLocks));
 }
 
 /*
@@ -1512,11 +1510,9 @@ AbortStrongLockAcquire(void)
 
 	fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
 	Assert(locallock->holdsStrongLockCount == TRUE);
-	SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
-	FastPathStrongRelationLocks->count[fasthashcode]--;
 	locallock->holdsStrongLockCount = FALSE;
 	StrongLockInProgress = NULL;
-	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+	atomic_dec(&FastPathStrongRelationLocks->count[fasthashcode], FAST_PATH_MUTEX(FastPathStrongRelationLocks));
 }
 
 /*
@@ -2881,9 +2877,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
 	{
 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
 
-		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
-		FastPathStrongRelationLocks->count[fasthashcode]--;
-		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+		atomic_dec(&FastPathStrongRelationLocks->count[fasthashcode], FAST_PATH_MUTEX(FastPathStrongRelationLocks));
 	}
 }
 
@@ -3765,9 +3759,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
 	{
 		uint32		fasthashcode = FastPathStrongLockHashPartition(hashcode);
 
-		SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
-		FastPathStrongRelationLocks->count[fasthashcode]++;
-		SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+		atomic_inc(&FastPathStrongRelationLocks->count[fasthashcode], FAST_PATH_MUTEX(FastPathStrongRelationLocks));
 	}
 
 	LWLockRelease(partitionLock);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index edaf853..e6b2943 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -176,6 +176,12 @@
 /* Define to 1 if you have __sync_lock_test_and_set(int *) and friends. */
 #undef HAVE_GCC_INT_ATOMICS
 
+/* Define to 1 if you have __atomic_add_fetch(int *, int, int) and friends. */
+#undef HAVE_GCC_INT_ATOMIC_ADD 1
+
+/* Define to 1 if you have __sync_add_and_fetch(int *, int) and friends. */
+#undef HAVE_GCC_INT_SYNC_ADD 1
+
 /* Define to 1 if you have the `getaddrinfo' function. */
 #undef HAVE_GETADDRINFO
 
diff --git a/src/include/storage/atomics.h b/src/include/storage/atomics.h
new file mode 100644
index 0000000..b828942
--- /dev/null
+++ b/src/include/storage/atomics.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * atomics.h
+ *	   Hardware-independent implementation of atomics instructions
+ *	   on primitive values.
+ *
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/storage/atomics.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ATOMICS_H
+#define ATOMICS_H
+
+#include "storage/spin.h"
+
+#ifdef HAVE_GCC_INT_ATOMIC_ADD
+
+#define MUTEXLESS_ATOMIC_INC 1
+
+static __inline__ void
+atomic_inc(volatile uint32 *counter, void* ignored)
+{
+		__atomic_add_fetch(counter, 1, __ATOMIC_SEQ_CST);
+}
+
+static __inline__ void
+atomic_dec(volatile uint32 *counter, void* ignored)
+{
+		__atomic_sub_fetch(counter, 1, __ATOMIC_SEQ_CST);
+}
+
+#elif HAVE_GCC_INT_ATOMIC_ADD
+
+#define MUTEXLESS_ATOMIC_INC 1
+
+static __inline__ void
+atomic_inc(volatile uint32 *counter, void *ignored)
+{
+		__sync_add_and_fetch(counter, 1);
+}
+
+static __inline__ void
+atomic_dec(volatile uint32 *counter, void *ignored)
+{
+		__sync_add_and_fetch(counter, -1);
+}
+
+#else /* !(HAVE_GCC_INT_ATOMIC_ADD || HAVE_GCC_INT_SYNC_ADD) */
+
+static __inline__ void
+atomic_inc(volatile uint32 *counter, volatile slock_t *mutex)
+{
+		SpinLockAcquire(mutex);
+		(*counter)++;
+		SpinLockRelease(mutex);
+}
+
+static __inline__ void
+atomic_dec(volatile uint32 *counter, volatile slock_t *mutex)
+{
+		SpinLockAcquire(mutex);
+		(*counter)--;
+		SpinLockRelease(mutex);
+}
+
+#endif /* HAVE_GCC_INT_ATOMIC_ADD || HAVE_GCC_INT_SYNC_ADD */
+
+#endif   /* ATOMICS_H */
-- 
1.8.0.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to