PG Gurus,
I have a table like this:
CREATE TABLE filemods (
guid BIGINT NOT NULL UNIQUE,
filepath_guid BIGINT NOT NULL,
createtimeTIMESTAMP WITH TIME ZONE DEFAULT NULL,
writetime TIMESTAMP WITH TIME ZONE DEFAULT NULL,
deletetimeTIMESTAMP WITH TIME ZONE DEFAULT NULL,
);
One "event" might have (1, '2012-01-25 11:00:00', NULL, NULL) and
another event might have (1, NULL, '2012-01-25 11:05:00', NULL) and the
end result should be:
(1, '2012-01-25 11:00:00', '2012-01-25 11:05:00', NULL).
I'm trying to modify pg_bulkload to "merge" two rows together. The
changes I have made seem to be working, although I would like input on
what I am doing that is unsafe or terribly wrong. You can be brutal.
I've seen incredible write speed with using pg_bulkload. If I can just
get it to "consolidate" our rows based on the unique key it will remove
a lot of complexity in our software.
Also, I'm not entirely sure this mailing list is the correct one, but
with all the internals you all know, I'm hoping you can help point out
nasty flaws in my algorithm. This is the first legitimate attempt I
have made at modifying PG source, so I'm not real familiar with the
proper way of loading pages and tuples and updating heaps and all that
pg core stuff.
Here's the modifications to pg_btree.c (from pg_bulkload HEAD):
http://pastebin.com/U23CapvR
I also attached the patch.
Thank you!!
Ben
--
Benjamin Johnson
http://getcarbonblack.com/ | @getcarbonblack
cell: 312.933.3612
diff -r 3f065ec72ab8 pgbulkload/lib/pg_btree.c
--- a/pgbulkload/lib/pg_btree.c Fri Jan 20 09:26:20 2012 -0600
+++ b/pgbulkload/lib/pg_btree.c Wed Jan 25 13:37:43 2012 -0600
@@ -398,6 +398,8 @@
BTReaderTerm(&reader);
}
+void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src);
+
/*
* _bt_mergeload - Merge two streams of index tuples into new index files.
*/
@@ -462,7 +464,6 @@
}
else
{
- // TODO -- BSJ
if (on_duplicate ==
ON_DUPLICATE_KEEP_NEW)
{
self->dup_old++;
@@ -470,7 +471,21 @@
RelationGetRelationName(wstate->index));
itup2 =
BTReaderGetNextItem(btspool2);
}
- else
+ else if (on_duplicate ==
ON_DUPLICATE_MERGE)
+ {
+ self->dup_old++;
+
+ // merge from itup into itup2
where itup's col[i] is not null
+ // but itup2's col[i] IS null
+ merge_tuples(heapRel, itup2,
itup);
+
+ ItemPointerCopy(&t_tid2,
&itup2->t_tid);
+ self->dup_new++;
+ remove_duplicate(self, heapRel,
itup,
+
RelationGetRelationName(wstate->index));
+ itup =
BTSpoolGetNextItem(btspool, itup, &should_free);
+ }
+ else
{
ItemPointerCopy(&t_tid2,
&itup2->t_tid);
self->dup_new++;
@@ -950,6 +965,113 @@
self->dup_old + self->dup_new, relname);
}
+// returns Buffer after locking it (BUFFER_LOCK_SHARE then BUFFER_LOCK_UNLOCK)
+Buffer load_buffer(Relation heap, IndexTuple itup, HeapTupleData *tuple /*OUT
*/, ItemId *itemid /*OUT */ )
+{
+ BlockNumber blknum;
+ BlockNumber offnum;
+ Buffer buffer;
+ Pagepage;
+
+ blknum = ItemPointerGetBlockNumber(&itup->t_tid);
+ offnum = ItemPointerGetOffsetNumber(&itup->t_tid);
+ buffer = ReadBuffer(heap, blknum);
+
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ page = BufferGetPage(buffer);
+ *itemid = PageGetItemId(page, offnum);
+ tuple->t_data = ItemIdIsNormal(*itemid)
+ ? (HeapTupleHeader) PageGetItem(page, *itemid)
+ : NULL;
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ return buffer;
+}
+
+void load_tuple(Relation heap,
+ HeapTuple tuple,
+ IndexTuple itup,
+ ItemId