On Tue, 2008-02-26 at 21:36 +0000, Simon Riggs wrote: > On Tue, 2008-02-26 at 15:12 -0500, Tom Lane wrote: > > Simon Riggs <[EMAIL PROTECTED]> writes: > > > Following patch implements a simple mechanism to keep a buffer pinned > > > while we are bulk loading. > > > > This will fail to clean up nicely after a subtransaction abort, no? > > Yes, will fix.
Additional line in AbortSubTransaction handles this. > > (For that matter I don't think it's right even for a top-level abort.) > > And I'm pretty sure it will trash your table entirely if someone > > inserts into another relation while a bulk insert is happening. > > (Not at all impossible, think of triggers for instance.) > > The pinned buffer is separate from the preferred block for each > relation; BulkInsertBuffer isn't used for determining the block to > insert into. If you try to insert into a block that differs from the > pinned one it unpins it and re-pins the new one. So it is always safe > with respect to the data in the table. > > It can run into recursive bulk insert ops but that just destroys the > performance advantage, its not actually dangerous. I'm about to start refactoring code as suggested, so wanted to drop off another version to allow everybody to examine the safety/not of this approach. (So this patch is WIP) -- Simon Riggs 2ndQuadrant http://www.2ndQuadrant.com PostgreSQL UK 2008 Conference: http://www.postgresql.org.uk
Index: src/backend/access/heap/heapam.c =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/heap/heapam.c,v retrieving revision 1.251 diff -c -r1.251 heapam.c *** src/backend/access/heap/heapam.c 8 Mar 2008 21:57:59 -0000 1.251 --- src/backend/access/heap/heapam.c 20 Mar 2008 13:38:53 -0000 *************** *** 1744,1749 **** --- 1744,1764 ---- } } + /* + * Begin/End Bulk Inserts + * + */ + void + heap_begin_bulk_insert(void) + { + ReleaseBulkInsertBufferIfAny(); + } + + void + heap_end_bulk_insert(void) + { + ReleaseBulkInsertBufferIfAny(); + } /* * heap_insert - insert tuple into a heap *************** *** 1771,1781 **** */ Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, ! bool use_wal, bool use_fsm) { TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; if (relation->rd_rel->relhasoids) { --- 1786,1797 ---- */ Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, ! bool use_wal, bool use_fsm, bool bulk_insert_request) { TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; + bool bulk_insert = bulk_insert_request && !relation->rd_istemp; if (relation->rd_rel->relhasoids) { *************** *** 1828,1836 **** else heaptup = tup; ! /* Find buffer to insert this tuple into */ ! buffer = RelationGetBufferForTuple(relation, heaptup->t_len, ! InvalidBuffer, use_fsm); /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); --- 1844,1861 ---- else heaptup = tup; ! /* ! * Find buffer to insert this tuple into ! */ ! if (bulk_insert) ! { ! buffer = RelationGetBufferForTuple(relation, heaptup->t_len, ! GetBulkInsertBuffer(), use_fsm, true); ! SetBulkInsertBuffer(buffer); ! } ! else ! buffer = RelationGetBufferForTuple(relation, heaptup->t_len, ! InvalidBuffer, use_fsm, false); /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); *************** *** 1909,1915 **** END_CRIT_SECTION(); ! UnlockReleaseBuffer(buffer); /* * If tuple is cachable, mark it for invalidation from the caches in case --- 1934,1946 ---- END_CRIT_SECTION(); ! /* ! * Keep buffer pinned if we are in bulk insert mode ! */ ! if (bulk_insert) ! LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ! else ! UnlockReleaseBuffer(buffer); /* * If tuple is cachable, mark it for invalidation from the caches in case *************** *** 1946,1952 **** Oid simple_heap_insert(Relation relation, HeapTuple tup) { ! return heap_insert(relation, tup, GetCurrentCommandId(true), true, true); } /* --- 1977,1983 ---- Oid simple_heap_insert(Relation relation, HeapTuple tup) { ! return heap_insert(relation, tup, GetCurrentCommandId(true), true, true, false); } /* *************** *** 2569,2575 **** { /* Assume there's no chance to put heaptup on same page. */ newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, ! buffer, true); } else { --- 2600,2606 ---- { /* Assume there's no chance to put heaptup on same page. */ newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, ! buffer, true, false); } else { *************** *** 2586,2592 **** */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, ! buffer, true); } else { --- 2617,2623 ---- */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, ! buffer, true, false); } else { Index: src/backend/access/heap/hio.c =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/heap/hio.c,v retrieving revision 1.68 diff -c -r1.68 hio.c *** src/backend/access/heap/hio.c 1 Jan 2008 19:45:46 -0000 1.68 --- src/backend/access/heap/hio.c 20 Mar 2008 13:38:53 -0000 *************** *** 103,109 **** */ Buffer RelationGetBufferForTuple(Relation relation, Size len, ! Buffer otherBuffer, bool use_fsm) { Buffer buffer = InvalidBuffer; Page pageHeader; --- 103,109 ---- */ Buffer RelationGetBufferForTuple(Relation relation, Size len, ! Buffer otherBuffer, bool use_fsm, bool bulk_insert) { Buffer buffer = InvalidBuffer; Page pageHeader; *************** *** 198,216 **** buffer = otherBuffer; LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); } ! else if (otherBlock < targetBlock) { ! /* lock other buffer first */ buffer = ReadBuffer(relation, targetBlock); - LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); } ! else { ! /* lock target buffer first */ ! buffer = ReadBuffer(relation, targetBlock); ! LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ! LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); } /* --- 198,225 ---- buffer = otherBuffer; LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); } ! else if (bulk_insert) { ! ReleaseBuffer(otherBuffer); buffer = ReadBuffer(relation, targetBlock); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); } ! else if (!bulk_insert) { ! if (otherBlock < targetBlock) ! { ! /* lock other buffer first */ ! buffer = ReadBuffer(relation, targetBlock); ! LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); ! LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ! } ! else ! { ! /* lock target buffer first */ ! buffer = ReadBuffer(relation, targetBlock); ! LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ! LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); ! } } /* *************** *** 265,270 **** --- 274,282 ---- */ needLock = !RELATION_IS_LOCAL(relation); + if (bulk_insert && otherBuffer != InvalidBuffer) + ReleaseBuffer(otherBuffer); + if (needLock) LockRelationForExtension(relation, ExclusiveLock); *************** *** 280,286 **** * We can be certain that locking the otherBuffer first is OK, since it * must have a lower page number. */ ! if (otherBuffer != InvalidBuffer) LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); /* --- 292,298 ---- * We can be certain that locking the otherBuffer first is OK, since it * must have a lower page number. */ ! if (!bulk_insert && otherBuffer != InvalidBuffer) LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); /* Index: src/backend/access/heap/tuptoaster.c =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/heap/tuptoaster.c,v retrieving revision 1.84 diff -c -r1.84 tuptoaster.c *** src/backend/access/heap/tuptoaster.c 7 Mar 2008 23:20:21 -0000 1.84 --- src/backend/access/heap/tuptoaster.c 20 Mar 2008 13:38:53 -0000 *************** *** 1215,1221 **** if (!HeapTupleIsValid(toasttup)) elog(ERROR, "failed to build TOAST tuple"); ! heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm); /* * Create the index entry. We cheat a little here by not using --- 1215,1221 ---- if (!HeapTupleIsValid(toasttup)) elog(ERROR, "failed to build TOAST tuple"); ! heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm, false); /* * Create the index entry. We cheat a little here by not using Index: src/backend/access/transam/xact.c =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/transam/xact.c,v retrieving revision 1.260 diff -c -r1.260 xact.c *** src/backend/access/transam/xact.c 17 Mar 2008 19:44:41 -0000 1.260 --- src/backend/access/transam/xact.c 20 Mar 2008 14:00:47 -0000 *************** *** 2105,2110 **** --- 2105,2111 ---- RESOURCE_RELEASE_BEFORE_LOCKS, false, true); AtEOXact_Buffers(false); + AtEOXact_BulkInsert(); AtEOXact_RelationCache(false); AtEOXact_Inval(false); smgrDoPendingDeletes(false); *************** *** 3939,3944 **** --- 3940,3946 ---- ResourceOwnerRelease(s->curTransactionOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false); + AtEOXact_BulkInsert(); AtEOSubXact_RelationCache(false, s->subTransactionId, s->parent->subTransactionId); AtEOSubXact_Inval(false); Index: src/backend/commands/copy.c =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/commands/copy.c,v retrieving revision 1.296 diff -c -r1.296 copy.c *** src/backend/commands/copy.c 8 Mar 2008 01:16:26 -0000 1.296 --- src/backend/commands/copy.c 20 Mar 2008 13:38:53 -0000 *************** *** 1656,1661 **** --- 1656,1662 ---- CommandId mycid = GetCurrentCommandId(true); bool use_wal = true; /* by default, use WAL logging */ bool use_fsm = true; /* by default, use FSM for free space */ + bool bulk_insert = true; /* by default, use bulk inserts */ Assert(cstate->rel); *************** *** 1900,1905 **** --- 1901,1909 ---- done = CopyReadLine(cstate); } + if (bulk_insert) + heap_begin_bulk_insert(); + while (!done) { bool skip_tuple; *************** *** 2112,2118 **** ExecConstraints(resultRelInfo, slot, estate); /* OK, store the tuple and create index entries for it */ ! heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm); if (resultRelInfo->ri_NumIndices > 0) ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false); --- 2116,2122 ---- ExecConstraints(resultRelInfo, slot, estate); /* OK, store the tuple and create index entries for it */ ! heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm, bulk_insert); if (resultRelInfo->ri_NumIndices > 0) ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false); *************** *** 2129,2134 **** --- 2133,2141 ---- } } + if (bulk_insert) + heap_end_bulk_insert(); + /* Done, clean up */ error_context_stack = errcontext.previous; Index: src/backend/executor/execMain.c =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/executor/execMain.c,v retrieving revision 1.303 diff -c -r1.303 execMain.c *** src/backend/executor/execMain.c 7 Feb 2008 17:09:51 -0000 1.303 --- src/backend/executor/execMain.c 20 Mar 2008 13:38:53 -0000 *************** *** 1527,1533 **** */ newId = heap_insert(resultRelationDesc, tuple, estate->es_output_cid, ! true, true); IncrAppended(); (estate->es_processed)++; --- 1527,1533 ---- */ newId = heap_insert(resultRelationDesc, tuple, estate->es_output_cid, ! true, true, false); IncrAppended(); (estate->es_processed)++; *************** *** 2720,2726 **** static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) { ! /* no-op */ } /* --- 2720,2726 ---- static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) { ! heap_begin_bulk_insert(); } /* *************** *** 2739,2745 **** tuple, estate->es_output_cid, estate->es_into_relation_use_wal, ! false); /* never any point in using FSM */ /* We know this is a newly created relation, so there are no indexes */ --- 2739,2746 ---- tuple, estate->es_output_cid, estate->es_into_relation_use_wal, ! false, /* never any point in using FSM */ ! true); /* always run a bulk insert */ /* We know this is a newly created relation, so there are no indexes */ *************** *** 2754,2760 **** static void intorel_shutdown(DestReceiver *self) { ! /* no-op */ } /* --- 2755,2761 ---- static void intorel_shutdown(DestReceiver *self) { ! heap_end_bulk_insert(); } /* Index: src/backend/storage/buffer/bufmgr.c =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/buffer/bufmgr.c,v retrieving revision 1.228 diff -c -r1.228 bufmgr.c *** src/backend/storage/buffer/bufmgr.c 1 Jan 2008 19:45:51 -0000 1.228 --- src/backend/storage/buffer/bufmgr.c 20 Mar 2008 13:38:53 -0000 *************** *** 75,80 **** --- 75,82 ---- /* local state for LockBufferForCleanup */ static volatile BufferDesc *PinCountWaitBuf = NULL; + /* local state for bulk inserts */ + static Buffer BulkInsertBuffer = InvalidBuffer; static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, *************** *** 2087,2092 **** --- 2089,2124 ---- } /* + * BulkInsertBuffer manipulation + */ + Buffer + GetBulkInsertBuffer(void) + { + return BulkInsertBuffer; + } + + void + SetBulkInsertBuffer(Buffer buffer) + { + BulkInsertBuffer = buffer; + } + + void + ReleaseBulkInsertBufferIfAny(void) + { + if (BufferIsValid(BulkInsertBuffer)) + ReleaseBuffer(BulkInsertBuffer); + + BulkInsertBuffer = InvalidBuffer; + } + + void + AtEOXact_BulkInsert(void) + { + BulkInsertBuffer = InvalidBuffer; + } + + /* * IncrBufferRefCount * Increment the pin count on a buffer that we have *already* pinned * at least once. Index: src/include/access/heapam.h =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/access/heapam.h,v retrieving revision 1.131 diff -c -r1.131 heapam.h *** src/include/access/heapam.h 8 Mar 2008 21:57:59 -0000 1.131 --- src/include/access/heapam.h 20 Mar 2008 13:38:53 -0000 *************** *** 165,172 **** ItemPointer tid); extern void setLastTid(const ItemPointer tid); extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, ! bool use_wal, bool use_fsm); extern HTSU_Result heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid, TransactionId *update_xmax, CommandId cid, Snapshot crosscheck, bool wait); --- 165,174 ---- ItemPointer tid); extern void setLastTid(const ItemPointer tid); + void heap_begin_bulk_insert(void); + void heap_end_bulk_insert(void); extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, ! bool use_wal, bool use_fsm, bool bulk_insert); extern HTSU_Result heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid, TransactionId *update_xmax, CommandId cid, Snapshot crosscheck, bool wait); Index: src/include/access/hio.h =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/access/hio.h,v retrieving revision 1.35 diff -c -r1.35 hio.h *** src/include/access/hio.h 1 Jan 2008 19:45:56 -0000 1.35 --- src/include/access/hio.h 20 Mar 2008 13:38:53 -0000 *************** *** 21,26 **** extern void RelationPutHeapTuple(Relation relation, Buffer buffer, HeapTuple tuple); extern Buffer RelationGetBufferForTuple(Relation relation, Size len, ! Buffer otherBuffer, bool use_fsm); #endif /* HIO_H */ --- 21,26 ---- extern void RelationPutHeapTuple(Relation relation, Buffer buffer, HeapTuple tuple); extern Buffer RelationGetBufferForTuple(Relation relation, Size len, ! Buffer otherBuffer, bool use_fsm, bool bulk_insert); #endif /* HIO_H */ Index: src/include/storage/bufmgr.h =================================================================== RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/bufmgr.h,v retrieving revision 1.111 diff -c -r1.111 bufmgr.h *** src/include/storage/bufmgr.h 1 Jan 2008 19:45:58 -0000 1.111 --- src/include/storage/bufmgr.h 20 Mar 2008 13:38:53 -0000 *************** *** 166,171 **** --- 166,176 ---- extern void AtProcExit_LocalBuffers(void); + extern Buffer GetBulkInsertBuffer(void); + extern void SetBulkInsertBuffer(Buffer buffer); + extern void ReleaseBulkInsertBufferIfAny(void); + extern void AtEOXact_BulkInsert(void); + /* in freelist.c */ extern BufferAccessStrategy GetAccessStrategy(BufferAccessStrategyType btype); extern void FreeAccessStrategy(BufferAccessStrategy strategy);
-- Sent via pgsql-patches mailing list (pgsql-patches@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-patches