*** ./src/backend/access/heap/heapam.c.orig Wed May 9 19:57:02 2007 --- ./src/backend/access/heap/heapam.c Wed May 9 19:59:30 2007 *************** *** 74,79 **** --- 74,81 ---- static void initscan(HeapScanDesc scan, ScanKey key) { + int i; + /* * Determine the number of blocks we have to scan. * *************** *** 88,93 **** --- 90,100 ---- ItemPointerSetInvalid(&scan->rs_ctup.t_self); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + for (i = 0; i < lengthof(scan->rs_rahead); i++) + { + scan->rs_rahead[i].buffer = InvalidBuffer; + scan->rs_rahead[i].block = InvalidBlockNumber; + } /* we don't have a marked position... */ ItemPointerSetInvalid(&(scan->rs_mctid)); *************** *** 111,117 **** * which tuples on the page are visible. */ static void ! heapgetpage(HeapScanDesc scan, BlockNumber page) { Buffer buffer; Snapshot snapshot; --- 118,124 ---- * which tuples on the page are visible. */ static void ! heapgetpage(HeapScanDesc scan, BlockNumber page, bool backward) { Buffer buffer; Snapshot snapshot; *************** *** 121,135 **** OffsetNumber lineoff; ItemId lpp; ! Assert(page < scan->rs_nblocks); ! scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf, ! scan->rs_rd, ! page); ! scan->rs_cblock = page; ! if (!scan->rs_pageatatime) return; buffer = scan->rs_cbuf; snapshot = scan->rs_snapshot; --- 128,223 ---- OffsetNumber lineoff; ItemId lpp; ! /* don't run prefetching code if the table is small. This has the ! side effect of using the full buffer pool to cache the table */ ! bool is_small = (scan->rs_nblocks <= (NBuffers * 60 / 100)); ! Assert(page < scan->rs_nblocks); ! if (! scan->rs_pageatatime) ! { ! scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf, ! scan->rs_rd, ! page); ! scan->rs_cblock = page; return; + } + + if (is_small) + { + scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf, scan->rs_rd, page); + scan->rs_cblock = page; + } + else + { + int ri, i; + const int incr = backward ? -1 : 1; + const int rimax = lengthof(scan->rs_rahead); + + /* + * This will fill up to the first 2 quarters of read ahead + */ + for (i = 0; i < rimax / 2; i++) + { + const int pg = page + incr * i; + if (! (0 <= pg && pg < scan->rs_nblocks)) + break; + ri = pg % rimax; + if (scan->rs_rahead[ri].block == pg) + { + /* continue the loop to ensure that the first 2 quarters + * of rs_rahead are filled with pages we will need */ + continue; + } + + /* read the page */ + scan->rs_rahead[ri].block = pg; + scan->rs_rahead[ri].buffer + = KillAndReadBuffer(scan->rs_rahead[ri].buffer, scan->rs_rd, pg); + } + + /* + * This will fill the 3rd quarter of read ahead provided that it is not filled + */ + for (i = rimax / 2; i < rimax * 3 / 4; i++) + { + const int pg = page + incr * i; + if (! (0 <= pg && pg < scan->rs_nblocks)) + break; + ri = pg % rimax; + if (scan->rs_rahead[ri].block == pg) + { + /* break as soon as we found a page in rs_rahead. this ensures + * that we will only fill 3rd quarter of rs_rahead when we have + * consumed the first quarter of rs_rahead, and currently + * processing a page in the 2nd quarter of rs_rahead. + */ + break; + } + + /* read the page */ + scan->rs_rahead[ri].block = pg; + scan->rs_rahead[ri].buffer + = KillAndReadBuffer(scan->rs_rahead[ri].buffer, scan->rs_rd, pg); + } + + /* check page at the expected rs_rahead slot match */ + ri = page % rimax; + if (scan->rs_rahead[ri].block != page) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("page number mismatch, expect %d, but got %d", + page, scan->rs_rahead[ri].block))); + + /* release the current buffer */ + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + + /* pin the new buffer in the scan's cbuf */ + scan->rs_cbuf = scan->rs_rahead[ri].buffer; + scan->rs_cblock = page; + IncrBufferRefCount(scan->rs_cbuf); + } buffer = scan->rs_cbuf; snapshot = scan->rs_snapshot; *************** *** 226,232 **** return; } page = 0; /* first page */ ! heapgetpage(scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; } --- 314,320 ---- return; } page = 0; /* first page */ ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; } *************** *** 260,266 **** return; } page = scan->rs_nblocks - 1; /* final page */ ! heapgetpage(scan, page); } else { --- 348,354 ---- return; } page = scan->rs_nblocks - 1; /* final page */ ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); } else { *************** *** 301,307 **** page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) ! heapgetpage(scan, page); /* Since the tuple was previously fetched, needn't lock page here */ dp = (Page) BufferGetPage(scan->rs_cbuf); --- 389,395 ---- page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); /* Since the tuple was previously fetched, needn't lock page here */ dp = (Page) BufferGetPage(scan->rs_cbuf); *************** *** 387,393 **** page = backward ? (page - 1) : (page + 1); ! heapgetpage(scan, page); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); --- 475,481 ---- page = backward ? (page - 1) : (page + 1); ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); *************** *** 453,459 **** return; } page = 0; /* first page */ ! heapgetpage(scan, page); lineindex = 0; scan->rs_inited = true; } --- 541,547 ---- return; } page = 0; /* first page */ ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); lineindex = 0; scan->rs_inited = true; } *************** *** 484,490 **** return; } page = scan->rs_nblocks - 1; /* final page */ ! heapgetpage(scan, page); } else { --- 572,578 ---- return; } page = scan->rs_nblocks - 1; /* final page */ ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); } else { *************** *** 522,528 **** page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) ! heapgetpage(scan, page); /* Since the tuple was previously fetched, needn't lock page here */ dp = (Page) BufferGetPage(scan->rs_cbuf); --- 610,616 ---- page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); /* Since the tuple was previously fetched, needn't lock page here */ dp = (Page) BufferGetPage(scan->rs_cbuf); *************** *** 606,612 **** } page = backward ? (page - 1) : (page + 1); ! heapgetpage(scan, page); dp = (Page) BufferGetPage(scan->rs_cbuf); lines = scan->rs_ntuples; --- 694,700 ---- } page = backward ? (page - 1) : (page + 1); ! heapgetpage(scan, page, ScanDirectionIsBackward(dir)); dp = (Page) BufferGetPage(scan->rs_cbuf); lines = scan->rs_ntuples; *************** *** 969,979 **** --- 1057,1071 ---- heap_rescan(HeapScanDesc scan, ScanKey key) { + int i; /* * unpin scan buffers */ if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + for (i = 0; i < lengthof(scan->rs_rahead); i++) + if (BufferIsValid(scan->rs_rahead[i].buffer)) + ReleaseBuffer(scan->rs_rahead[i].buffer); /* * reinitialize scan descriptor *************** *** 991,996 **** --- 1083,1089 ---- void heap_endscan(HeapScanDesc scan) { + int i; /* Note: no locking manipulations needed */ /* *************** *** 998,1004 **** */ if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); ! /* * decrement relation reference count and free scan descriptor storage */ --- 1091,1099 ---- */ if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); ! for (i = 0; i < lengthof(scan->rs_rahead); i++) ! if (BufferIsValid(scan->rs_rahead[i].buffer)) ! ReleaseBuffer(scan->rs_rahead[i].buffer); /* * decrement relation reference count and free scan descriptor storage */ Binary files ./src/backend/postgres.orig and ./src/backend/postgres differ *** ./src/backend/storage/buffer/bufmgr.c.orig Wed May 9 19:57:11 2007 --- ./src/backend/storage/buffer/bufmgr.c Wed May 9 19:59:44 2007 *************** *** 96,102 **** static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum, ! bool zeroPage); static bool PinBuffer(volatile BufferDesc *buf); static void PinBuffer_Locked(volatile BufferDesc *buf); static void UnpinBuffer(volatile BufferDesc *buf, --- 96,102 ---- static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum, ! bool zeroPage, BufferDesc* availBufHdr); static bool PinBuffer(volatile BufferDesc *buf); static void PinBuffer_Locked(volatile BufferDesc *buf); static void UnpinBuffer(volatile BufferDesc *buf, *************** *** 108,114 **** int set_flag_bits); static void buffer_write_error_callback(void *arg); static volatile BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum, ! bool *foundPtr); static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); --- 108,114 ---- int set_flag_bits); static void buffer_write_error_callback(void *arg); static volatile BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum, ! bool *foundPtr, BufferDesc* availBufHdr); static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); *************** *** 131,137 **** Buffer ReadBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_common(reln, blockNum, false); } /* --- 131,137 ---- Buffer ReadBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_common(reln, blockNum, false, NULL); } /* *************** *** 146,159 **** Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_common(reln, blockNum, true); } /* * ReadBuffer_common -- common logic for ReadBuffer and ReadOrZeroBuffer */ static Buffer ! ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage) { volatile BufferDesc *bufHdr; Block bufBlock; --- 146,161 ---- Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_common(reln, blockNum, true, NULL); } + + /* * ReadBuffer_common -- common logic for ReadBuffer and ReadOrZeroBuffer */ static Buffer ! ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, BufferDesc* availBufHdr) { volatile BufferDesc *bufHdr; Block bufBlock; *************** *** 191,197 **** * lookup the buffer. IO_IN_PROGRESS is set if the requested block is * not currently in memory. */ ! bufHdr = BufferAlloc(reln, blockNum, &found); if (found) BufferHitCount++; } --- 193,199 ---- * lookup the buffer. IO_IN_PROGRESS is set if the requested block is * not currently in memory. */ ! bufHdr = BufferAlloc(reln, blockNum, &found, availBufHdr); if (found) BufferHitCount++; } *************** *** 205,215 **** { /* Just need to update stats before we exit */ pgstat_count_buffer_hit(&reln->pgstat_info, reln); ! ! if (VacuumCostActive) ! VacuumCostBalance += VacuumCostPageHit; ! ! return BufferDescriptorGetBuffer(bufHdr); } /* --- 207,213 ---- { /* Just need to update stats before we exit */ pgstat_count_buffer_hit(&reln->pgstat_info, reln); ! goto done; } /* *************** *** 325,333 **** --- 323,336 ---- TerminateBufferIO(bufHdr, false, BM_VALID); } + done: + if (VacuumCostActive) VacuumCostBalance += VacuumCostPageMiss; + if (availBufHdr && availBufHdr != bufHdr) + StrategyFreeBuffer(availBufHdr, true); + return BufferDescriptorGetBuffer(bufHdr); } *************** *** 349,355 **** static volatile BufferDesc * BufferAlloc(Relation reln, BlockNumber blockNum, ! bool *foundPtr) { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ --- 352,359 ---- static volatile BufferDesc * BufferAlloc(Relation reln, BlockNumber blockNum, ! bool *foundPtr, ! BufferDesc* availBufHdr) { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ *************** *** 419,431 **** /* Loop here in case we have to try another victim buffer */ for (;;) { ! /* ! * Select a victim buffer. The buffer is returned with its header ! * spinlock still held! Also the BufFreelistLock is still held, since ! * it would be bad to hold the spinlock while possibly waking up other ! * processes. ! */ ! buf = StrategyGetBuffer(); Assert(buf->refcount == 0); --- 423,449 ---- /* Loop here in case we have to try another victim buffer */ for (;;) { ! bool unlockBufFreeList = false; ! if (availBufHdr) ! { ! /* ! * If caller gave us a victim buffer, then try to use it! ! */ ! buf = availBufHdr; ! LockBufHdr(buf); /* PinBuffer_Locked will unlock this */ ! availBufHdr = NULL; ! } ! else ! { ! /* ! * Select a victim buffer. The buffer is returned with its header ! * spinlock still held! Also the BufFreelistLock is still held, since ! * it would be bad to hold the spinlock while possibly waking up other ! * processes. ! */ ! buf = StrategyGetBuffer(); ! unlockBufFreeList = true; ! } Assert(buf->refcount == 0); *************** *** 436,442 **** PinBuffer_Locked(buf); /* Now it's safe to release the freelist lock */ ! LWLockRelease(BufFreelistLock); /* * If the buffer was dirty, try to write it out. There is a race --- 454,461 ---- PinBuffer_Locked(buf); /* Now it's safe to release the freelist lock */ ! if (unlockBufFreeList) ! LWLockRelease(BufFreelistLock); /* * If the buffer was dirty, try to write it out. There is a race *************** *** 656,664 **** * * The buffer could get reclaimed by someone else while we are waiting * to acquire the necessary locks; if so, don't mess it up. */ ! static void ! InvalidateBuffer(volatile BufferDesc *buf) { BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ --- 675,688 ---- * * The buffer could get reclaimed by someone else while we are waiting * to acquire the necessary locks; if so, don't mess it up. + * + * Return true if buffer is indeed invalidated, false otherwise. Returning + * false is usally not consequential as it means that the buf cannot be + * invalidated because someone else is using it. Caller should just forget + * about this buf. */ ! static bool ! InvalidateBuffer(volatile BufferDesc *buf, bool putInFreeList) { BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ *************** *** 694,700 **** { UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); ! return; } /* --- 718,724 ---- { UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); ! return false; } /* *************** *** 742,748 **** /* * Insert the buffer at the head of the list of free buffers. */ ! StrategyFreeBuffer(buf, true); } /* --- 766,775 ---- /* * Insert the buffer at the head of the list of free buffers. */ ! if (putInFreeList) ! StrategyFreeBuffer(buf, true); ! ! return true; } /* *************** *** 839,844 **** --- 866,925 ---- return ReadBuffer(relation, blockNum); } + + /* + * KillAndReadBuffer -- combine ReleaseBuffer() or InvalidateBuffer(), and ReadBuffer(). + * The buffer supplied is invalidated and reuse if possible. + */ + Buffer + KillAndReadBuffer(Buffer buffer, + Relation relation, + BlockNumber blockNum) + { + BufferDesc* bufHdr; + if (!BufferIsValid(buffer) || BufferIsLocal(buffer)) + { + /* use good old ReleaseAndReadBuffer in these cases */ + return ReleaseAndReadBuffer(buffer, relation, blockNum); + } + + /* buffer is valid and it is not local */ + Assert(PrivateRefCount[buffer - 1] > 0); + bufHdr = &BufferDescriptors[buffer - 1]; + /* we have pin, so it's ok to examine tag without spinlock */ + if (bufHdr->tag.blockNum == blockNum + && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node)) + { + /* block num matched... just return the same buffer */ + return buffer; + } + + + /* don't kill the buffer if someone else is using it or used it recently, + or if it's dirty */ + UnpinBuffer(bufHdr, true, true); + LockBufHdr(bufHdr); + if (PrivateRefCount[buffer - 1] > 0 + || bufHdr->refcount > 0 + || bufHdr->usage_count > 1 + || (bufHdr->flags & (BM_IO_IN_PROGRESS | BM_DIRTY))) + { + /* backoff to ReadBuffer in these cases */ + UnlockBufHdr(bufHdr); + return ReadBuffer(relation, blockNum); + } + + /* invalidate the buffer, but don't put on freelist */ + if (! InvalidateBuffer(bufHdr, false)) /* this will unlock bufhdr */ + { + /* if InvalidateBuffer failed, then backoff to good old ReadBuffer */ + return ReadBuffer(relation, blockNum); + } + + /* reuse the freed buffer to read the new page */ + return ReadBuffer_common(relation, blockNum, false, bufHdr); + } + /* * PinBuffer -- make buffer unavailable for replacement. * *************** *** 1540,1546 **** LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.blockNum >= firstDelBlock) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 1621,1627 ---- LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.blockNum >= firstDelBlock) ! InvalidateBuffer(bufHdr, true); /* releases spinlock */ else UnlockBufHdr(bufHdr); } *************** *** 1573,1579 **** bufHdr = &BufferDescriptors[i]; LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) ! InvalidateBuffer(bufHdr); /* releases spinlock */ else UnlockBufHdr(bufHdr); } --- 1654,1660 ---- bufHdr = &BufferDescriptors[i]; LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) ! InvalidateBuffer(bufHdr, true); /* releases spinlock */ else UnlockBufHdr(bufHdr); } *** ./src/include/access/relscan.h.orig Wed May 9 19:58:09 2007 --- ./src/include/access/relscan.h Wed May 9 19:59:59 2007 *************** *** 44,49 **** --- 44,55 ---- int rs_mindex; /* marked tuple's saved index */ int rs_ntuples; /* number of visible tuples on page */ OffsetNumber rs_vistuples[MaxHeapTuplesPerPage]; /* their offsets */ + + /* read-ahead buffers for scans. */ + struct { + BlockNumber block; /* the block number */ + Buffer buffer; /* the corresponding buffer that's pinned */ + } rs_rahead[16]; } HeapScanDescData; typedef HeapScanDescData *HeapScanDesc; *** ./src/include/storage/bufmgr.h.orig Wed May 9 19:58:44 2007 --- ./src/include/storage/bufmgr.h Wed May 9 20:00:07 2007 *************** *** 118,124 **** extern void IncrBufferRefCount(Buffer buffer); extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum); ! extern void InitBufferPool(void); extern void InitBufferPoolAccess(void); extern void InitBufferPoolBackend(void); --- 118,125 ---- extern void IncrBufferRefCount(Buffer buffer); extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum); ! extern Buffer KillAndReadBuffer(Buffer buffer, Relation relation, ! BlockNumber blockNum); extern void InitBufferPool(void); extern void InitBufferPoolAccess(void); extern void InitBufferPoolBackend(void);