On 2015-09-14 17:41:42 +0200, Andres Freund wrote: > I pointed out how you can actually make this safely lock-free giving you > the interesting code.
And here's an actual implementation of that approach. It's definitely work-in-progress and could easily be optimized further. Don't have any big machines to play around with right now tho. Andres
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index 3ae2848..3e70792 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -95,12 +95,9 @@ InitBufferPool(void) BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - buf->refcount = 0; - buf->wait_backend_pid = 0; - SpinLockInit(&buf->buf_hdr_lock); + pg_atomic_init_u32(&buf->state, 0); + buf->wait_backend_pid = 0; buf->buf_id = i; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 8c0358e..345322a 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -51,6 +51,8 @@ #include "utils/resowner_private.h" #include "utils/timestamp.h" +#define likely(x) __builtin_expect((x),1) +#define unlikely(x) __builtin_expect((x),0) /* Note: these two macros only work on shared buffers, not local ones! */ #define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) @@ -774,9 +776,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ if (isLocalBuf) { + uint32 state; + + state = pg_atomic_read_u32(&bufHdr->state); /* Only need to adjust flags */ - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; + Assert(state & BM_VALID); + state &= ~BM_VALID; + pg_atomic_write_u32(&bufHdr->state, state); } else { @@ -788,8 +794,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, do { LockBufHdr(bufHdr); - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; + Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID); + pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_VALID); UnlockBufHdr(bufHdr); } while (!StartBufferIO(bufHdr, true)); } @@ -807,7 +813,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); @@ -885,7 +891,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - bufHdr->flags |= BM_VALID; + pg_atomic_fetch_or_u32(&bufHdr->state, BM_VALID); } else { @@ -939,7 +945,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; int buf_id; volatile BufferDesc *buf; bool valid; @@ -1013,10 +1019,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ buf = StrategyGetBuffer(strategy); - Assert(buf->refcount == 0); + Assert((pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) == 0); /* Must copy buffer flags while we still hold the spinlock */ - oldFlags = buf->flags; + oldFlags = pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK; /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); @@ -1210,8 +1216,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ - oldFlags = buf->flags; - if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) + oldFlags = pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK; + if ((pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) == 1 && + !(oldFlags & BM_DIRTY)) break; UnlockBufHdr(buf); @@ -1232,12 +1239,19 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; - buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); + pg_atomic_fetch_and_u32(&buf->state, + ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | + BM_CHECKPOINT_NEEDED | BM_IO_ERROR | + BM_PERMANENT | + BUF_USAGECOUNT_MASK)); if (relpersistence == RELPERSISTENCE_PERMANENT) - buf->flags |= BM_TAG_VALID | BM_PERMANENT; + pg_atomic_fetch_or_u32(&buf->state, + BM_TAG_VALID | BM_PERMANENT | + BUF_USAGECOUNT_ONE); else - buf->flags |= BM_TAG_VALID; - buf->usage_count = 1; + pg_atomic_fetch_or_u32(&buf->state, + BM_TAG_VALID | + BUF_USAGECOUNT_ONE); UnlockBufHdr(buf); @@ -1286,7 +1300,7 @@ InvalidateBuffer(volatile BufferDesc *buf) BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; @@ -1329,7 +1343,7 @@ retry: * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ - if (buf->refcount != 0) + if ((pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) != 0) { UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); @@ -1344,10 +1358,9 @@ retry: * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ - oldFlags = buf->flags; + oldFlags = pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; + pg_atomic_fetch_and_u32(&buf->state, BM_LOCKED | ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK)); UnlockBufHdr(buf); @@ -1399,12 +1412,12 @@ MarkBufferDirty(Buffer buffer) LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); + Assert((pg_atomic_read_u32(&bufHdr->state) & BUF_REFCOUNT_MASK) > 0); /* * If the buffer was not dirty already, do vacuum accounting. */ - if (!(bufHdr->flags & BM_DIRTY)) + if (!(pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; @@ -1412,7 +1425,8 @@ MarkBufferDirty(Buffer buffer) VacuumCostBalance += VacuumCostPageDirty; } - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); + pg_atomic_fetch_or_u32(&bufHdr->state, + BM_DIRTY | BM_JUST_DIRTIED); UnlockBufHdr(bufHdr); } @@ -1495,23 +1509,39 @@ PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy) if (ref == NULL) { + uint32 state; + uint32 oldstate; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); - LockBufHdr(buf); - buf->refcount++; - if (strategy == NULL) - { - if (buf->usage_count < BM_MAX_USAGE_COUNT) - buf->usage_count++; - } - else + state = pg_atomic_read_u32(&buf->state); + oldstate = state; + + while (true) { - if (buf->usage_count == 0) - buf->usage_count = 1; + /* spin-wait till lock is free */ + while (unlikely(state & BM_LOCKED)) + { + pg_spin_delay(); + state = pg_atomic_read_u32(&buf->state); + } + + /* increase refcount */ + state += 1; + + /* increase usagecount unless already max */ + if (((state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT) != BM_MAX_USAGE_COUNT) + state += BUF_USAGECOUNT_ONE; + + result = (state & BM_VALID) != 0; + + if (likely(pg_atomic_compare_exchange_u32(&buf->state, &oldstate, state))) + break; + + /* get ready for next loop, oldstate has been updated by cas */ + state = oldstate; } - result = (buf->flags & BM_VALID) != 0; - UnlockBufHdr(buf); } else { @@ -1558,7 +1588,7 @@ PinBuffer_Locked(volatile BufferDesc *buf) */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); - buf->refcount++; + pg_atomic_fetch_add_u32(&buf->state, 1); UnlockBufHdr(buf); b = BufferDescriptorGetBuffer(buf); @@ -1594,30 +1624,41 @@ UnpinBuffer(volatile BufferDesc *buf, bool fixOwner) ref->refcount--; if (ref->refcount == 0) { + uint32 state; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(buf->content_lock)); Assert(!LWLockHeldByMe(buf->io_in_progress_lock)); - LockBufHdr(buf); - - /* Decrement the shared reference count */ - Assert(buf->refcount > 0); - buf->refcount--; + /* + * Decrement the shared reference count. + * + * Arguably it'd be more robust if we checked for BM_LOCKED here, but + * currently all manipulation of ->state for shared buffers is through + * atomics. + */ + state = pg_atomic_fetch_sub_u32(&buf->state, 1); + Assert((state & BUF_REFCOUNT_MASK) > 0); /* Support LockBufferForCleanup() */ - if ((buf->flags & BM_PIN_COUNT_WAITER) && - buf->refcount == 1) + if (state & BM_PIN_COUNT_WAITER) { - /* we just released the last pin other than the waiter's */ - int wait_backend_pid = buf->wait_backend_pid; + LockBufHdr(buf); - buf->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); - ProcSendSignal(wait_backend_pid); - } - else - UnlockBufHdr(buf); + if (pg_atomic_read_u32(&buf->state) & BM_PIN_COUNT_WAITER && + (pg_atomic_read_u32(&buf->state) & BUF_REFCOUNT_MASK) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pid = buf->wait_backend_pid; + pg_atomic_fetch_and_u32(&buf->state, + ~BM_PIN_COUNT_WAITER); + UnlockBufHdr(buf); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf); + } ForgetPrivateRefCountEntry(ref); } } @@ -1680,9 +1721,10 @@ BufferSync(int flags) */ LockBufHdr(bufHdr); - if ((bufHdr->flags & mask) == mask) + if ((pg_atomic_read_u32(&bufHdr->state) & mask) == mask) { - bufHdr->flags |= BM_CHECKPOINT_NEEDED; + pg_atomic_fetch_or_u32(&bufHdr->state, + BM_CHECKPOINT_NEEDED); num_to_write++; } @@ -1721,7 +1763,7 @@ BufferSync(int flags) * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ - if (bufHdr->flags & BM_CHECKPOINT_NEEDED) + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) { @@ -2081,6 +2123,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used) { volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 state; ReservePrivateRefCountEntry(); @@ -2095,7 +2138,10 @@ SyncOneBuffer(int buf_id, bool skip_recently_used) */ LockBufHdr(bufHdr); - if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) + state = pg_atomic_read_u32(&bufHdr->state); + + if ((state & BUF_REFCOUNT_MASK) == 0 && + (state & BUF_USAGECOUNT_MASK) == 0) result |= BUF_REUSABLE; else if (skip_recently_used) { @@ -2104,7 +2150,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used) return result; } - if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) + if (!(state & BM_VALID) || !(state & BM_DIRTY)) { /* It's clean, so nothing to do */ UnlockBufHdr(bufHdr); @@ -2256,6 +2302,7 @@ PrintBufferLeakWarning(Buffer buffer) int32 loccount; char *path; BackendId backend; + uint32 state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) @@ -2273,12 +2320,13 @@ PrintBufferLeakWarning(Buffer buffer) /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, - buf->tag.blockNum, buf->flags, - buf->refcount, loccount); + buf->tag.blockNum, state & BUF_FLAG_MASK, + state & BUF_REFCOUNT_MASK, loccount); pfree(path); } @@ -2424,7 +2472,7 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln) recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf->flags &= ~BM_JUST_DIRTIED; + pg_atomic_fetch_and_u32(&buf->state, ~BM_JUST_DIRTIED); UnlockBufHdr(buf); /* @@ -2444,7 +2492,7 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln) * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf->flags & BM_PERMANENT) + if (pg_atomic_read_u32(&buf->state) & BM_PERMANENT) XLogFlush(recptr); /* @@ -2538,7 +2586,7 @@ BufferIsPermanent(Buffer buffer) * old value or the new value, but not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); - return (bufHdr->flags & BM_PERMANENT) != 0; + return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* @@ -2874,7 +2922,8 @@ FlushRelationBuffers(Relation rel) { bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY)) + == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; @@ -2895,7 +2944,7 @@ FlushRelationBuffers(Relation rel) localpage, false); - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + pg_atomic_fetch_and_u32(&bufHdr->state, ~(BM_DIRTY | BM_JUST_DIRTIED)); /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -2923,7 +2972,8 @@ FlushRelationBuffers(Relation rel) LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY)) + == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); @@ -2975,7 +3025,8 @@ FlushDatabaseBuffers(Oid dbid) LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY)) + == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); @@ -3093,12 +3144,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ - if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != + if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 state; /* * If we need to protect hint bit updates from torn writes, WAL-log a @@ -3109,7 +3161,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ - if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) + if (XLogHintBitIsNeeded() && (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. @@ -3149,8 +3201,12 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) } LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (!(bufHdr->flags & BM_DIRTY)) + + state = pg_atomic_read_u32(&bufHdr->state); + + Assert((state & BUF_REFCOUNT_MASK) > 0); + + if (!(state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ @@ -3170,7 +3226,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); + + pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY | BM_JUST_DIRTIED); + UnlockBufHdr(bufHdr); if (delayChkpt) @@ -3208,9 +3266,9 @@ UnlockBuffers(void) * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ - if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + if ((pg_atomic_read_u32(&buf->state) & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) - buf->flags &= ~BM_PIN_COUNT_WAITER; + pg_atomic_fetch_and_u32(&buf->state, ~BM_PIN_COUNT_WAITER); UnlockBufHdr(buf); @@ -3304,25 +3362,30 @@ LockBufferForCleanup(Buffer buffer) for (;;) { + int state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + + state = pg_atomic_read_u32(&bufHdr->state); + + Assert((state & BUF_REFCOUNT_MASK) > 0); + if ((state & BUF_REFCOUNT_MASK) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); return; } /* Failed, so mark myself as waiting for pincount 1 */ - if (bufHdr->flags & BM_PIN_COUNT_WAITER) + if (state & BM_PIN_COUNT_WAITER) { UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; - bufHdr->flags |= BM_PIN_COUNT_WAITER; + pg_atomic_fetch_or_u32(&bufHdr->state, BM_PIN_COUNT_WAITER); PinCountWaitBuf = bufHdr; UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -3349,9 +3412,9 @@ LockBufferForCleanup(Buffer buffer) * better be safe. */ LockBufHdr(bufHdr); - if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && + if ((pg_atomic_read_u32(&bufHdr->state) & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) - bufHdr->flags &= ~BM_PIN_COUNT_WAITER; + pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_PIN_COUNT_WAITER); UnlockBufHdr(bufHdr); PinCountWaitBuf = NULL; @@ -3393,22 +3456,25 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) { volatile BufferDesc *bufHdr; + uint32 refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ - Assert(LocalRefCount[-buffer - 1] > 0); - if (LocalRefCount[-buffer - 1] != 1) + Assert(refcount > 0); + if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ - Assert(GetPrivateRefCount(buffer) > 0); - if (GetPrivateRefCount(buffer) != 1) + refcount = GetPrivateRefCount(buffer); + Assert(refcount); + if (refcount != 1) return false; /* Try to acquire lock */ @@ -3417,8 +3483,10 @@ ConditionalLockBufferForCleanup(Buffer buffer) bufHdr = GetBufferDescriptor(buffer - 1); LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + + refcount = pg_atomic_read_u32(&bufHdr->state) & BUF_REFCOUNT_MASK; + Assert(refcount > 0); + if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr(bufHdr); @@ -3456,7 +3524,7 @@ WaitIO(volatile BufferDesc *buf) */ for (;;) { - BufFlags sv_flags; + uint32 state; /* * It may not be necessary to acquire the spinlock to check the flag @@ -3464,9 +3532,10 @@ WaitIO(volatile BufferDesc *buf) * play it safe. */ LockBufHdr(buf); - sv_flags = buf->flags; + state = pg_atomic_read_u32(&buf->state); UnlockBufHdr(buf); - if (!(sv_flags & BM_IO_IN_PROGRESS)) + + if (!(state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); LWLockRelease(buf->io_in_progress_lock); @@ -3494,6 +3563,8 @@ WaitIO(volatile BufferDesc *buf) static bool StartBufferIO(volatile BufferDesc *buf, bool forInput) { + uint32 state; + Assert(!InProgressBuf); for (;;) @@ -3506,7 +3577,9 @@ StartBufferIO(volatile BufferDesc *buf, bool forInput) LockBufHdr(buf); - if (!(buf->flags & BM_IO_IN_PROGRESS)) + state = pg_atomic_read_u32(&buf->state); + + if (!(state & BM_IO_IN_PROGRESS)) break; /* @@ -3522,7 +3595,7 @@ StartBufferIO(volatile BufferDesc *buf, bool forInput) /* Once we get here, there is definitely no I/O active on this buffer */ - if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) + if (forInput ? (state & BM_VALID) : !(state & BM_DIRTY)) { /* someone else already did the I/O */ UnlockBufHdr(buf); @@ -3530,7 +3603,7 @@ StartBufferIO(volatile BufferDesc *buf, bool forInput) return false; } - buf->flags |= BM_IO_IN_PROGRESS; + pg_atomic_fetch_or_u32(&buf->state, BM_IO_IN_PROGRESS); UnlockBufHdr(buf); @@ -3565,11 +3638,13 @@ TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, LockBufHdr(buf); - Assert(buf->flags & BM_IO_IN_PROGRESS); - buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) - buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - buf->flags |= set_flag_bits; + Assert(pg_atomic_read_u32(&buf->state) & BM_IO_IN_PROGRESS); + + pg_atomic_fetch_and_u32(&buf->state, ~(BM_IO_IN_PROGRESS | BM_IO_ERROR)); + if (clear_dirty && !(pg_atomic_read_u32(&buf->state) & BM_JUST_DIRTIED)) + pg_atomic_fetch_and_u32(&buf->state, ~(BM_DIRTY | BM_CHECKPOINT_NEEDED)); + + pg_atomic_fetch_or_u32(&buf->state, set_flag_bits); UnlockBufHdr(buf); @@ -3603,23 +3678,24 @@ AbortBufferIO(void) LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); LockBufHdr(buf); - Assert(buf->flags & BM_IO_IN_PROGRESS); + Assert(pg_atomic_read_u32(&buf->state) & BM_IO_IN_PROGRESS); if (IsForInput) { - Assert(!(buf->flags & BM_DIRTY)); + Assert(!(pg_atomic_read_u32(&buf->state) & BM_DIRTY)); + /* We'd better not think buffer is valid yet */ - Assert(!(buf->flags & BM_VALID)); + Assert(!(pg_atomic_read_u32(&buf->state) & BM_VALID)); UnlockBufHdr(buf); } else { - BufFlags sv_flags; + uint32 state; - sv_flags = buf->flags; - Assert(sv_flags & BM_DIRTY); + state = pg_atomic_read_u32(&buf->state); + Assert(state & BM_DIRTY); UnlockBufHdr(buf); /* Issue notice if this is not the first failure... */ - if (sv_flags & BM_IO_ERROR) + if (state & BM_IO_ERROR) { /* Buffer is pinned, so we can read tag without spinlock */ char *path; @@ -3701,3 +3777,33 @@ rnode_comparator(const void *p1, const void *p2) else return 0; } + +void +LockBufHdr(volatile BufferDesc *desc) +{ + uint32 state = pg_atomic_read_u32(&desc->state); + + for (;;) + { + /* wait till lock is free */ + while (unlikely(state & BM_LOCKED)) + { + pg_spin_delay(); + state = pg_atomic_read_u32(&desc->state); + + /* Add exponential backoff? Should seldomly be contended tho. */ + } + + /* and try to get lock */ + if (pg_atomic_compare_exchange_u32(&desc->state, &state, state | BM_LOCKED)) + break; + } +} + +void +UnlockBufHdr(volatile BufferDesc *desc) +{ + Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED); + + pg_atomic_fetch_sub_u32(&desc->state, BM_LOCKED); +} diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index bc2c773..3f2227b 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -250,6 +250,8 @@ StrategyGetBuffer(BufferAccessStrategy strategy) { while (true) { + uint32 state; + /* Acquire the spinlock to remove element from the freelist */ SpinLockAcquire(&StrategyControl->buffer_strategy_lock); @@ -280,7 +282,9 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * of 8.3, but we'd better check anyway.) */ LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count == 0) + state = pg_atomic_read_u32(&buf->state); + if ((state & BUF_REFCOUNT_MASK) == 0 + && (state & BUF_USAGECOUNT_MASK) == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); @@ -295,6 +299,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) trycounter = NBuffers; for (;;) { + uint32 state; buf = GetBufferDescriptor(ClockSweepTick()); @@ -303,11 +308,15 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * it; decrement the usage_count (unless pinned) and keep scanning. */ LockBufHdr(buf); - if (buf->refcount == 0) + + state = pg_atomic_read_u32(&buf->state); + + if ((state & BUF_REFCOUNT_MASK) == 0) { - if (buf->usage_count > 0) + if ((state & BUF_USAGECOUNT_MASK) != 0) { - buf->usage_count--; + pg_atomic_fetch_sub_u32(&buf->state, BUF_USAGECOUNT_ONE); + trycounter = NBuffers; } else @@ -589,6 +598,8 @@ GetBufferFromRing(BufferAccessStrategy strategy) { volatile BufferDesc *buf; Buffer bufnum; + uint32 state; + uint32 usagecount; /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) @@ -617,7 +628,10 @@ GetBufferFromRing(BufferAccessStrategy strategy) */ buf = GetBufferDescriptor(bufnum - 1); LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count <= 1) + state = pg_atomic_read_u32(&buf->state); + usagecount = (state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT; + if ((state & BUF_REFCOUNT_MASK) == 0 + && usagecount <= 1) { strategy->current_was_in_ring = true; return buf; diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 3144afe..1e11d71 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, int b; int trycounter; bool found; + uint32 state; INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -128,16 +129,25 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif + state = pg_atomic_read_u32(&bufHdr->state); + /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count < BM_MAX_USAGE_COUNT) - bufHdr->usage_count++; + int usagecount; + + usagecount = (state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT; + + if (usagecount < BM_MAX_USAGE_COUNT) + { + state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, state); + } } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); - if (bufHdr->flags & BM_VALID) + if (state & BM_VALID) *foundPtr = TRUE; else { @@ -169,9 +179,15 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count > 0) + int usagecount; + + state = pg_atomic_read_u32(&bufHdr->state); + usagecount = (state & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT; + + if (usagecount > 0) { - bufHdr->usage_count--; + state -= BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, state); trycounter = NLocBuffer; } else @@ -193,7 +209,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ - if (bufHdr->flags & BM_DIRTY) + if (state & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); @@ -211,7 +227,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, false); /* Mark not-dirty now in case we error out below */ - bufHdr->flags &= ~BM_DIRTY; + state &= ~BM_DIRTY; + pg_atomic_write_u32(&bufHdr->state, state); pgBufferUsage.local_blks_written++; } @@ -228,7 +245,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, /* * Update the hash table: remove old entry, if any, and make new one. */ - if (bufHdr->flags & BM_TAG_VALID) + if (state & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, @@ -237,7 +254,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID); + state &= ~(BM_VALID | BM_TAG_VALID); + pg_atomic_write_u32(&bufHdr->state, state); } hresult = (LocalBufferLookupEnt *) @@ -250,9 +268,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * it's all ours now. */ bufHdr->tag = newTag; - bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); - bufHdr->flags |= BM_TAG_VALID; - bufHdr->usage_count = 1; + state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); + state |= BM_TAG_VALID; + state &= ~BUF_USAGECOUNT_MASK; + state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, state); *foundPtr = FALSE; return bufHdr; @@ -267,6 +287,7 @@ MarkLocalBufferDirty(Buffer buffer) { int bufid; BufferDesc *bufHdr; + uint32 state; Assert(BufferIsLocal(buffer)); @@ -280,10 +301,13 @@ MarkLocalBufferDirty(Buffer buffer) bufHdr = GetLocalBufferDescriptor(bufid); - if (!(bufHdr->flags & BM_DIRTY)) + state = pg_atomic_read_u32(&bufHdr->state); + + if (!(state & BM_DIRTY)) pgBufferUsage.local_blks_dirtied++; - bufHdr->flags |= BM_DIRTY; + state |= BM_DIRTY; + pg_atomic_write_u32(&bufHdr->state, state); } /* @@ -307,8 +331,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 state; - if ((bufHdr->flags & BM_TAG_VALID) && + state = pg_atomic_read_u32(&bufHdr->state); + + if ((state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) @@ -327,8 +354,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + state &= ~BUF_FLAG_MASK; + state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, state); } } } @@ -349,8 +377,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 state; + + state = pg_atomic_read_u32(&bufHdr->state); - if ((bufHdr->flags & BM_TAG_VALID) && + if ((state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) @@ -367,8 +398,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + state &= ~BUF_FLAG_MASK; + state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, state); } } } diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 521ee1c..92889e6 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -20,29 +20,40 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/smgr.h" +#include "port/atomics.h" #include "storage/spin.h" #include "utils/relcache.h" /* + * State is: + * 10 bit flags + * 4 bit usage count + * 18 bit refcount + */ +#define BUF_REFCOUNT_MASK ((1U << 18) - 1) +#define BUF_FLAG_MASK 0xFFC00000U +#define BUF_USAGECOUNT_MASK 0x003C0000U +#define BUF_USAGECOUNT_ONE (1U << 18) +#define BUF_USAGECOUNT_SHIFT 18 + +/* * Flags for buffer descriptors * * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ -#define BM_DIRTY (1 << 0) /* data needs writing */ -#define BM_VALID (1 << 1) /* data is valid */ -#define BM_TAG_VALID (1 << 2) /* tag is assigned */ -#define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */ -#define BM_IO_ERROR (1 << 4) /* previous I/O failed */ -#define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */ -#define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */ -#define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */ -#define BM_PERMANENT (1 << 8) /* permanent relation (not +#define BM_LOCKED (1U << 22) /* buffer header is locked */ +#define BM_DIRTY (1U << 23) /* data needs writing */ +#define BM_VALID (1U << 24) /* data is valid */ +#define BM_TAG_VALID (1U << 25) /* tag is assigned */ +#define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */ +#define BM_IO_ERROR (1U << 27) /* previous I/O failed */ +#define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */ +#define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */ +#define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */ +#define BM_PERMANENT (1U << 31) /* permanent relation (not * unlogged) */ - -typedef bits16 BufFlags; - /* * The maximum allowed value of usage_count represents a tradeoff between * accuracy and speed of the clock-sweep buffer management algorithm. A @@ -137,12 +148,11 @@ typedef struct buftag typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ - BufFlags flags; /* see bit definitions above */ - uint16 usage_count; /* usage counter for clock sweep code */ - unsigned refcount; /* # of backends holding pins on buffer */ - int wait_backend_pid; /* backend PID of pin-count waiter */ - slock_t buf_hdr_lock; /* protects the above fields */ + /* state of the tag, containing flags, refcount and usagecount */ + pg_atomic_uint32 state; + + int wait_backend_pid; /* backend PID of pin-count waiter */ int buf_id; /* buffer's index number (from 0) */ int freeNext; /* link in freelist chain */ @@ -192,16 +202,11 @@ typedef union BufferDescPadded #define FREENEXT_NOT_IN_LIST (-2) /* - * Macros for acquiring/releasing a shared buffer header's spinlock. - * Do not apply these to local buffers! - * - * Note: as a general coding rule, if you are using these then you probably - * need to be using a volatile-qualified pointer to the buffer header, to - * ensure that the compiler doesn't rearrange accesses to the header to - * occur before or after the spinlock is acquired/released. + * Functions for acquiring/releasing a shared buffer header's spinlock. Do + * not apply these to local buffers! FIXUP! */ -#define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock) -#define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock) +extern void LockBufHdr(volatile BufferDesc *desc); +extern void UnlockBufHdr(volatile BufferDesc *desc); /* in buf_init.c */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers