At 2012-11-15 10:11:58 -0500, [email protected] wrote:
>
> It could use a run through pgindent.
Done.
> I think you want Assert(heap->size < heap->space).
Of course. Thanks for catching that.
> Project style is to omit braces for a single-line body.
I've removed most of them. In the few cases where one branch of an
if/else would have a one-line body and the other would not, I chose
to rewrite the code to avoid the problem (because, like Andrew, I
hate having to do that).
I wasn't sure how to present the following:
if (right_off < heap->size &&
heap->compare(&heap->nodes[node_off],
&heap->nodes[right_off]) < 0)
{
/* swap with the larger child */
if (!swap_off ||
heap->compare(&heap->nodes[left_off],
&heap->nodes[right_off]) < 0)
{
swap_off = right_off;
}
}
…so I left it alone. If you want me to remove the inner braces and
resubmit, despite the multi-line condition, let me know.
I didn't know which header comments Álvaro meant I should remove,
so I haven't done that either. I did remove the TESTBINHEAP stuff,
though.
I have attached the updated patch here. Thanks for your review.
-- Abhijit
diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 98ce3d7..327a1bc 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/lib
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = ilist.o stringinfo.o
+OBJS = ilist.o binaryheap.o stringinfo.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
new file mode 100644
index 0000000..54d20b2
--- /dev/null
+++ b/src/include/lib/binaryheap.h
@@ -0,0 +1,121 @@
+/*
+ * binaryheap.h
+ *
+ * A simple binary heap implementation
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/lib/binaryheap.h
+ */
+
+#ifndef BINARYHEAP_H
+#define BINARYHEAP_H
+
+/*
+ * This structure represents a single node in a binaryheap, and just
+ * holds two pointers. The heap management code doesn't care what is
+ * stored in a node (in particular, the key or value may be NULL),
+ * only that the comparator function can compare any two nodes.
+ */
+
+typedef struct binaryheap_node
+{
+ void *key;
+ void *value;
+} binaryheap_node;
+
+/*
+ * For a max-heap, the comparator must return:
+ * -1 iff a < b
+ * 0 iff a == b
+ * +1 iff a > b
+ * For a min-heap, the conditions are reversed.
+ */
+typedef int (*binaryheap_comparator) (binaryheap_node * a, binaryheap_node * b);
+
+/*
+ * binaryheap
+ *
+ * size how many nodes are currently in "nodes"
+ * space how many nodes can be stored in "nodes"
+ * comparator comparator to define the heap property
+ * nodes the first of a list of "space" nodes
+ */
+typedef struct binaryheap
+{
+ size_t size;
+ size_t space;
+ binaryheap_comparator compare;
+ binaryheap_node nodes[1];
+} binaryheap;
+
+/*
+ * binaryheap_allocate
+ *
+ * Returns a pointer to a newly-allocated heap that has the capacity to
+ * store the given number of nodes, with the heap property defined by
+ * the given comparator function.
+ */
+binaryheap * binaryheap_allocate(size_t capacity, binaryheap_comparator compare);
+
+/*
+ * binaryheap_free
+ *
+ * Releases memory used by the given binaryheap.
+ */
+void binaryheap_free(binaryheap * heap);
+
+/*
+ * binaryheap_add_unordered
+ *
+ * Adds the given key and value to the end of the heap's list of nodes
+ * in O(1) without preserving the heap property. This is a convenience
+ * to add elements quickly to a new heap. To obtain a valid heap, one
+ * must call binaryheap_build() afterwards.
+ */
+void binaryheap_add_unordered(binaryheap * heap, void *key, void *value);
+
+/*
+ * binaryheap_build
+ *
+ * Assembles a valid heap in O(n) from the nodes added by
+ * binaryheap_add_unordered(). Not needed otherwise.
+ */
+void binaryheap_build(binaryheap * heap);
+
+/*
+ * binaryheap_add
+ *
+ * Adds the given key and value to the heap in O(log n), while
+ * preserving the heap property.
+ */
+void binaryheap_add(binaryheap * heap, void *key, void *value);
+
+/*
+ * binaryheap_first
+ *
+ * Returns a pointer to the first (root, topmost) node in the heap
+ * without modifying the heap. Returns NULL if the heap is empty.
+ * Always O(1).
+ */
+binaryheap_node * binaryheap_first(binaryheap * heap);
+
+/*
+ * binaryheap_remove_first
+ *
+ * Removes the first (root, topmost) node in the heap and returns a
+ * pointer to it after rebalancing the heap. Returns NULL if the heap
+ * is empty. O(log n) worst case.
+ */
+binaryheap_node * binaryheap_remove_first(binaryheap * heap);
+
+/*
+ * binaryheap_replace_first
+ *
+ * Change the key and/or value of the first (root, topmost) node and
+ * ensure that the heap property is preserved. O(1) in the best case,
+ * or O(log n) if it must fall back to sifting the new node down.
+ */
+void binaryheap_replace_first(binaryheap * heap, void *newkey, void *newval);
+
+#endif /* BINARYHEAP_H */
diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c
new file mode 100644
index 0000000..c6d00d5
--- /dev/null
+++ b/src/backend/lib/binaryheap.c
@@ -0,0 +1,330 @@
+/*-------------------------------------------------------------------------
+ *
+ * binaryheap.c
+ * A simple binary heap implementaion
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/lib/binaryheap.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "lib/binaryheap.h"
+
+static void sift_down(binaryheap * heap, size_t node_off);
+static void sift_up(binaryheap * heap, size_t node_off);
+static inline void swap_nodes(binaryheap * heap, size_t a, size_t b);
+
+/*
+ * binaryheap_allocate
+ *
+ * Returns a pointer to a newly-allocated heap that has the capacity to
+ * store the given number of nodes, with the heap property defined by
+ * the given comparator function.
+ */
+binaryheap *
+binaryheap_allocate(size_t capacity, binaryheap_comparator compare)
+{
+ int sz = sizeof(binaryheap) + sizeof(binaryheap_node) * (capacity - 1);
+
+ binaryheap *heap = palloc(sz);
+
+ heap->compare = compare;
+ heap->space = capacity;
+ heap->size = 0;
+
+ return heap;
+}
+
+/*
+ * binaryheap_free
+ *
+ * Releases memory used by the given binaryheap.
+ */
+void
+binaryheap_free(binaryheap * heap)
+{
+ pfree(heap);
+}
+
+/*
+ * These utility functions return the offset of the left child, right
+ * child, and parent of the node at the given index, respectively.
+ *
+ * The heap is represented as an array of nodes, with the root node
+ * stored at index 0. The left child of node i is at index 2*i+1, and
+ * the right child at 2*i+2. The parent of node i is at index (i-1)/2.
+ */
+
+static inline int
+left_offset(size_t i)
+{
+ return 2 * i + 1;
+}
+
+static inline int
+right_offset(size_t i)
+{
+ return 2 * i + 2;
+}
+
+static inline int
+parent_offset(size_t i)
+{
+ return floor((i - 1) / 2);
+}
+
+/*
+ * binaryheap_add_unordered
+ *
+ * Adds the given key and value to the end of the heap's list of nodes
+ * in O(1) without preserving the heap property. This is a convenience
+ * to add elements quickly to a new heap. To obtain a valid heap, one
+ * must call binaryheap_build() afterwards.
+ */
+void
+binaryheap_add_unordered(binaryheap * heap, void *key, void *value)
+{
+ Assert(heap->size < heap->space);
+ heap->nodes[heap->size].key = key;
+ heap->nodes[heap->size].value = value;
+ heap->size++;
+}
+
+/*
+ * binaryheap_build
+ *
+ * Assembles a valid heap in O(n) from the nodes added by
+ * binaryheap_add_unordered(). Not needed otherwise.
+ */
+void
+binaryheap_build(binaryheap * heap)
+{
+ int i;
+
+ for (i = parent_offset(heap->size - 1); i >= 0; i--)
+ sift_down(heap, i);
+}
+
+/*
+ * binaryheap_add
+ *
+ * Adds the given key and value to the heap in O(log n), while
+ * preserving the heap property.
+ */
+void
+binaryheap_add(binaryheap * heap, void *key, void *value)
+{
+ binaryheap_add_unordered(heap, key, value);
+ sift_up(heap, heap->size - 1);
+}
+
+/*
+ * binaryheap_first
+ *
+ * Returns a pointer to the first (root, topmost) node in the heap
+ * without modifying the heap. Returns NULL if the heap is empty.
+ * Always O(1).
+ */
+binaryheap_node *
+binaryheap_first(binaryheap * heap)
+{
+ if (!heap->size)
+ return NULL;
+ return &heap->nodes[0];
+}
+
+/*
+ * binaryheap_remove_first
+ *
+ * Removes the first (root, topmost) node in the heap and returns a
+ * pointer to it after rebalancing the heap. Returns NULL if the heap
+ * is empty. O(log n) worst case.
+ */
+binaryheap_node *
+binaryheap_remove_first(binaryheap * heap)
+{
+ if (heap->size == 0)
+ return NULL;
+
+ if (heap->size == 1)
+ {
+ heap->size--;
+ return &heap->nodes[0];
+ }
+
+ /*
+ * Swap the root and last nodes, decrease the size of the heap (i.e.
+ * remove the former root node) and sift the new root node down to its
+ * correct position.
+ */
+
+ swap_nodes(heap, 0, heap->size - 1);
+
+ heap->size--;
+ sift_down(heap, 0);
+ return &heap->nodes[heap->size];
+}
+
+/*
+ * binaryheap_replace_first
+ *
+ * Change the key and/or value of the first (root, topmost) node and
+ * ensure that the heap property is preserved. O(1) in the best case,
+ * or O(log n) if it must fall back to sifting the new node down.
+ */
+void
+binaryheap_replace_first(binaryheap * heap, void *key, void *val)
+{
+ int cmp;
+ size_t next_off = 0;
+
+ if (key)
+ heap->nodes[0].key = key;
+ if (val)
+ heap->nodes[0].value = val;
+
+ /*
+ * If the new root node is still larger than the largest of its children
+ * (of which there may be 0, 1, or 2), then the heap is still valid.
+ */
+
+ if (heap->size == 1)
+ return;
+
+ next_off = 1;
+ if (heap->size > 2)
+ {
+ /* Two children: pick the larger one */
+ cmp = heap->compare(&heap->nodes[2], &heap->nodes[1]);
+ if (cmp > 0)
+ next_off = 2;
+ }
+
+ cmp = heap->compare(&heap->nodes[next_off], &heap->nodes[0]);
+ if (cmp < 0)
+ return;
+
+ /* The child is larger. Swap and sift the new node down. */
+
+ swap_nodes(heap, 0, next_off);
+ sift_down(heap, next_off);
+}
+
+/*
+ * Swap the contents of two nodes.
+ */
+static inline void
+swap_nodes(binaryheap * heap, size_t a, size_t b)
+{
+ binaryheap_node swap;
+
+ swap.value = heap->nodes[a].value;
+ swap.key = heap->nodes[a].key;
+
+ heap->nodes[a].value = heap->nodes[b].value;
+ heap->nodes[a].key = heap->nodes[b].key;
+
+ heap->nodes[b].key = swap.key;
+ heap->nodes[b].value = swap.value;
+}
+
+/*
+ * Sift a node up to the highest position it can hold according to the
+ * comparator.
+ */
+static void
+sift_up(binaryheap * heap, size_t node_off)
+{
+ while (true)
+ {
+ int cmp;
+ size_t parent_off;
+
+ if (node_off == 0)
+ break;
+
+ /*
+ * If this node is smaller than its parent, the heap condition is
+ * satisfied, and we're done.
+ */
+
+ parent_off = parent_offset(node_off);
+ cmp = heap->compare(&heap->nodes[node_off],
+ &heap->nodes[parent_off]);
+ if (cmp <= 0)
+ break;
+
+ /*
+ * Otherwise, swap the node and its parent and go on to check the
+ * node's new parent.
+ */
+
+ swap_nodes(heap, node_off, parent_off);
+ node_off = parent_off;
+ }
+}
+
+/*
+ * Sift a node down from its current position to satisfy the heap
+ * property.
+ */
+static void
+sift_down(binaryheap * heap, size_t node_off)
+{
+ while (true)
+ {
+ size_t left_off = left_offset(node_off);
+ size_t right_off = right_offset(node_off);
+ size_t swap_off = 0;
+
+ /* Is the left child larger than the parent? */
+
+ if (left_off < heap->size &&
+ heap->compare(&heap->nodes[node_off],
+ &heap->nodes[left_off]) < 0)
+ {
+ swap_off = left_off;
+ }
+
+ /*
+ * If not (note: only one child can violate the heap property after a
+ * change), is the right child larger?
+ */
+
+ if (right_off < heap->size &&
+ heap->compare(&heap->nodes[node_off],
+ &heap->nodes[right_off]) < 0)
+ {
+ /* swap with the larger child */
+ if (!swap_off ||
+ heap->compare(&heap->nodes[left_off],
+ &heap->nodes[right_off]) < 0)
+ {
+ swap_off = right_off;
+ }
+ }
+
+ /*
+ * If we didn't find anything to swap, the heap condition is
+ * satisfied, and we're done.
+ */
+
+ if (!swap_off)
+ break;
+
+ /*
+ * Otherwise, swap the node with the child that violates the heap
+ * property; then go on to check its children.
+ */
+
+ swap_nodes(heap, swap_off, node_off);
+ node_off = swap_off;
+ }
+}
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index fec07b8..d4911bd 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1100,10 +1100,8 @@ typedef struct AppendState
* nkeys number of sort key columns
* sortkeys sort keys in SortSupport representation
* slots current output tuple of each subplan
- * heap heap of active tuples (represented as array indexes)
- * heap_size number of active heap entries
+ * heap heap of active tuples
* initialized true if we have fetched first tuple from each subplan
- * last_slot last subplan fetched from (which must be re-called)
* ----------------
*/
typedef struct MergeAppendState
@@ -1114,10 +1112,8 @@ typedef struct MergeAppendState
int ms_nkeys;
SortSupport ms_sortkeys; /* array of length ms_nkeys */
TupleTableSlot **ms_slots; /* array of length ms_nplans */
- int *ms_heap; /* array of length ms_nplans */
- int ms_heap_size; /* current active length of ms_heap[] */
+ struct binaryheap *ms_heap; /* binary heap of slot indices */
bool ms_initialized; /* are subplans started? */
- int ms_last_slot; /* last subplan slot we returned from */
} MergeAppendState;
/* ----------------
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index d5141ba..fa05ddb 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -41,6 +41,8 @@
#include "executor/execdebug.h"
#include "executor/nodeMergeAppend.h"
+#include "lib/binaryheap.h"
+
/*
* It gets quite confusing having a heap array (indexed by integers) which
* contains integers which index into the slots array. These typedefs try to
@@ -49,9 +51,7 @@
typedef int SlotNumber;
typedef int HeapPosition;
-static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
-static void heap_siftup_slot(MergeAppendState *node);
-static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
+static int heap_compare_slots(binaryheap_node * a, binaryheap_node * b);
/* ----------------------------------------------------------------
@@ -88,7 +88,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
mergestate->ms_nplans = nplans;
mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
- mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
+ mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots);
/*
* Miscellaneous initialization
@@ -143,9 +143,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
/*
* initialize to show we have not run the subplans yet
*/
- mergestate->ms_heap_size = 0;
mergestate->ms_initialized = false;
- mergestate->ms_last_slot = -1;
return mergestate;
}
@@ -160,6 +158,7 @@ TupleTableSlot *
ExecMergeAppend(MergeAppendState *node)
{
TupleTableSlot *result;
+ binaryheap_node *hn;
SlotNumber i;
if (!node->ms_initialized)
@@ -172,7 +171,7 @@ ExecMergeAppend(MergeAppendState *node)
{
node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
if (!TupIsNull(node->ms_slots[i]))
- heap_insert_slot(node, i);
+ binaryheap_add(node->ms_heap, node, (void *) i);
}
node->ms_initialized = true;
}
@@ -184,19 +183,20 @@ ExecMergeAppend(MergeAppendState *node)
* the logic a bit by doing this before returning from the prior call,
* but it's better to not pull tuples until necessary.)
*/
- i = node->ms_last_slot;
+ hn = binaryheap_first(node->ms_heap);
+ i = (int) hn->value;
node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
if (!TupIsNull(node->ms_slots[i]))
- heap_insert_slot(node, i);
+ binaryheap_replace_first(node->ms_heap, node, (void *) i);
+ else
+ (void) binaryheap_remove_first(node->ms_heap);
}
- if (node->ms_heap_size > 0)
+ hn = binaryheap_first(node->ms_heap);
+ if (hn)
{
- /* Return the topmost heap node, and sift up the remaining nodes */
- i = node->ms_heap[0];
+ i = (int) hn->value;
result = node->ms_slots[i];
- node->ms_last_slot = i;
- heap_siftup_slot(node);
}
else
{
@@ -208,65 +208,15 @@ ExecMergeAppend(MergeAppendState *node)
}
/*
- * Insert a new slot into the heap. The slot must contain a valid tuple.
- */
-static void
-heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
-{
- SlotNumber *heap = node->ms_heap;
- HeapPosition j;
-
- Assert(!TupIsNull(node->ms_slots[new_slot]));
-
- j = node->ms_heap_size++; /* j is where the "hole" is */
- while (j > 0)
- {
- int i = (j - 1) / 2;
-
- if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
- break;
- heap[j] = heap[i];
- j = i;
- }
- heap[j] = new_slot;
-}
-
-/*
- * Delete the heap top (the slot in heap[0]), and sift up.
- */
-static void
-heap_siftup_slot(MergeAppendState *node)
-{
- SlotNumber *heap = node->ms_heap;
- HeapPosition i,
- n;
-
- if (--node->ms_heap_size <= 0)
- return;
- n = node->ms_heap_size; /* heap[n] needs to be reinserted */
- i = 0; /* i is where the "hole" is */
- for (;;)
- {
- int j = 2 * i + 1;
-
- if (j >= n)
- break;
- if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
- j++;
- if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
- break;
- heap[i] = heap[j];
- i = j;
- }
- heap[i] = heap[n];
-}
-
-/*
* Compare the tuples in the two given slots.
*/
static int32
-heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
+heap_compare_slots(binaryheap_node * a, binaryheap_node * b)
{
+ MergeAppendState *node = (MergeAppendState *) a->key;
+ SlotNumber slot1 = (int) a->value;
+ SlotNumber slot2 = (int) b->value;
+
TupleTableSlot *s1 = node->ms_slots[slot1];
TupleTableSlot *s2 = node->ms_slots[slot2];
int nkey;
@@ -291,7 +241,7 @@ heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
datum2, isNull2,
sortKey);
if (compare != 0)
- return compare;
+ return -compare;
}
return 0;
}
@@ -347,7 +297,5 @@ ExecReScanMergeAppend(MergeAppendState *node)
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
- node->ms_heap_size = 0;
node->ms_initialized = false;
- node->ms_last_slot = -1;
}
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers