Greetings,

  Someone (*cough*Haas*cough) made a claim over beers at PGCon that it
  would be very difficult to come up with a way to pre-allocate List
  entries and maintain the current List API.  I'll admit that it wasn't
  quite as trivial as I had *hoped*, but attached is a proof-of-concept
  patch which does it.

  A couple of notes regarding the patch:

  First, it uses ffs(), which might not be fully portable..  We could
  certainly implement the same thing in userspace and use ffs() when
  it's available.

  Second, there are a couple of bugs (or at least, I'll characterize
  them that way) where we're pfree'ing a list which has been passed to
  list_concat().  That's not strictly legal as either argument passed to
  list_concat() could be returned and so one might end up free'ing the
  list pointer that was just returned.  Those pfree's are commented out
  in this patch, but really should probably just be removed or replaced
  with 'right' code (if it's critical to free this stuff).

  Third, COPY_NODE_CELL() wasn't used anywhere other than _copyList and
  would be difficult to keep as a macro given the way allocations happen
  now for lists.  It's no longer being used at all and therefore should
  really be removed.

  Fourth, the current implementation goes ahead and allocates 8
  ListCell's, which quadruples the size of a List (40 bytes to 168
  bytes, assuming 64bit ints).  I don't see that as really being an
  issue, but perhaps others would disagree.

  Fifth, list_concat() has become more like list_concat_unique() and
  friends (and hence slower).  Instead of being able to just tack one
  list on to the end of the other, we have to do an actual copy of the
  list.  This is due to having to allow callers to list_free(), which
  will call pfree(), and we don't have any way to know which ListCell's
  from the *old* list were pre-allocated and which weren't, which can
  end up causing pfree() to crash when it's passed an invalid pointer.
  In general, I'd hope that not having to palloc() for a small list
  might make up for a lot of that slowdown, but you can't really argue
  that anything O(n) can compete with the previous O(1) approach.

  Finally, sorry it's kind of a fugly patch, it's just a proof of
  concept and I'd be happy to clean it up if others feel it's worthwhile
  and a reasonable approach, but I really need to get it out there and
  take a break from it (I've been a bit obsessive-compulsive about it
  since PGCon.. :D).

        Thanks!

                Stephen
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
***************
*** 3766,3778 **** _copyList(List *from)
  	new = makeNode(List);
  	new->length = from->length;
  
! 	COPY_NODE_CELL(new->head, from->head);
  	prev_new = new->head;
  	curr_old = lnext(from->head);
  
  	while (curr_old)
  	{
! 		COPY_NODE_CELL(prev_new->next, curr_old);
  		prev_new = prev_new->next;
  		curr_old = curr_old->next;
  	}
--- 3766,3793 ----
  	new = makeNode(List);
  	new->length = from->length;
  
! 	new->head = new->base;
! 	new->avail_bitmap = (1 << LIST_PREALLOC) - 1 - 1;
! 
! 	lfirst(new->head) = copyObject(lfirst(from->head));
! 
  	prev_new = new->head;
  	curr_old = lnext(from->head);
  
  	while (curr_old)
  	{
! 		int			free_spot = ffs(new->avail_bitmap);
! 
! 		if (free_spot && free_spot <= LIST_PREALLOC)
! 		{
! 			prev_new->next = new->base + --free_spot;
! 			new->avail_bitmap &= ~(1 << free_spot);
! 		}
! 		else
! 			prev_new->next = (ListCell *) palloc(sizeof(ListCell));
! 
! 		lfirst(prev_new->next) = copyObject(lfirst(curr_old));
! 
  		prev_new = prev_new->next;
  		curr_old = curr_old->next;
  	}
*** a/src/backend/nodes/list.c
--- b/src/backend/nodes/list.c
***************
*** 39,44 **** check_list_invariants(List *list)
--- 39,45 ----
  	Assert(list->length > 0);
  	Assert(list->head != NULL);
  	Assert(list->tail != NULL);
+ 	Assert(list->base != NULL);
  
  	Assert(list->type == T_List ||
  		   list->type == T_IntList ||
***************
*** 49,54 **** check_list_invariants(List *list)
--- 50,57 ----
  	if (list->length == 2)
  		Assert(list->head->next == list->tail);
  	Assert(list->tail->next == NULL);
+ 
+ 	Assert(list->avail_bitmap >= 0 && list->avail_bitmap <= (1 << LIST_PREALLOC) - 1);
  }
  #else
  #define check_list_invariants(l)
***************
*** 65,77 **** new_list(NodeTag type)
  	List	   *new_list;
  	ListCell   *new_head;
  
! 	new_head = (ListCell *) palloc(sizeof(*new_head));
! 	new_head->next = NULL;
! 	/* new_head->data is left undefined! */
! 
  	new_list = (List *) palloc(sizeof(*new_list));
  	new_list->type = type;
  	new_list->length = 1;
  	new_list->head = new_head;
  	new_list->tail = new_head;
  
--- 68,91 ----
  	List	   *new_list;
  	ListCell   *new_head;
  
! 	/*
! 	 * Allocate our new_list, which will also include the first
! 	 * block of ListCell's in the ->base var
! 	 */
  	new_list = (List *) palloc(sizeof(*new_list));
  	new_list->type = type;
  	new_list->length = 1;
+ 
+ 	/* initialize the first entry in new_list->base */
+ 	/* new_head->data is left undefined! */
+ 	new_head = new_list->base;
+ 	new_head->next = NULL;
+ 
+ 	/* subtract 1 to get all-1's, marking all available
+ 	 * subtract another 1 to clear the first bit, marking it
+ 	 * used by the initially-allocated head node
+ 	 */
+ 	new_list->avail_bitmap = (1 << LIST_PREALLOC) - 1 - 1;
  	new_list->head = new_head;
  	new_list->tail = new_head;
  
***************
*** 89,98 **** static void
  new_head_cell(List *list)
  {
  	ListCell   *new_head;
  
! 	new_head = (ListCell *) palloc(sizeof(*new_head));
! 	new_head->next = list->head;
  
  	list->head = new_head;
  	list->length++;
  }
--- 103,120 ----
  new_head_cell(List *list)
  {
  	ListCell   *new_head;
+ 	int			free_spot = ffs(list->avail_bitmap);
  
! 	/* Check for a free spot in our initial allocation */
! 	if (free_spot && free_spot <= LIST_PREALLOC)
! 	{
! 		new_head = list->base + --free_spot;
! 		list->avail_bitmap &= ~(1 << free_spot);
! 	}
! 	else
! 		new_head = (ListCell *) palloc(sizeof(*new_head));
  
+ 	new_head->next = list->head;
  	list->head = new_head;
  	list->length++;
  }
***************
*** 108,115 **** static void
  new_tail_cell(List *list)
  {
  	ListCell   *new_tail;
  
- 	new_tail = (ListCell *) palloc(sizeof(*new_tail));
  	new_tail->next = NULL;
  
  	list->tail->next = new_tail;
--- 130,145 ----
  new_tail_cell(List *list)
  {
  	ListCell   *new_tail;
+ 	int			free_spot = ffs(list->avail_bitmap);
+ 
+ 	if (free_spot && free_spot <= LIST_PREALLOC)
+ 	{
+ 		new_tail = list->base + --free_spot;
+ 		list->avail_bitmap &= ~(1 << free_spot);
+ 	}
+ 	else
+ 		new_tail = (ListCell *) palloc(sizeof(*new_tail));
  
  	new_tail->next = NULL;
  
  	list->tail->next = new_tail;
***************
*** 185,192 **** static ListCell *
  add_new_cell(List *list, ListCell *prev_cell)
  {
  	ListCell   *new_cell;
  
- 	new_cell = (ListCell *) palloc(sizeof(*new_cell));
  	/* new_cell->data is left undefined! */
  	new_cell->next = prev_cell->next;
  	prev_cell->next = new_cell;
--- 215,230 ----
  add_new_cell(List *list, ListCell *prev_cell)
  {
  	ListCell   *new_cell;
+ 	int			free_spot = ffs(list->avail_bitmap);
+ 
+ 	if (free_spot && free_spot <= LIST_PREALLOC)
+ 	{
+ 		new_cell = list->base + --free_spot;
+ 		list->avail_bitmap &= ~(1 << free_spot);
+ 	}
+ 	else
+ 		new_cell = (ListCell *) palloc(sizeof(*new_cell));
  
  	/* new_cell->data is left undefined! */
  	new_cell->next = prev_cell->next;
  	prev_cell->next = new_cell;
***************
*** 320,325 **** lcons_oid(Oid datum, List *list)
--- 358,365 ----
  List *
  list_concat(List *list1, List *list2)
  {
+ 	ListCell   *cell;
+ 
  	if (list1 == NIL)
  		return list2;
  	if (list2 == NIL)
***************
*** 329,337 **** list_concat(List *list1, List *list2)
--- 369,387 ----
  
  	Assert(list1->type == list2->type);
  
+ 	foreach(cell, list2)
+ 		switch (list1->type) {
+ 			case T_List: list1 = lappend(list1, lfirst(cell)); break;
+ 			case T_IntList: list1 = lappend_int(list1, lfirst_int(cell)); break;
+ 			case T_OidList: list1 = lappend_oid(list1, lfirst_oid(cell)); break;
+ 			default: break; /* shut up compiler */
+ 		}
+ 
+ 	/*
  	list1->length += list2->length;
  	list1->tail->next = list2->head;
  	list1->tail = list2->tail;
+ 	*/
  
  	check_list_invariants(list1);
  	return list1;
***************
*** 359,364 **** list_truncate(List *list, int new_size)
--- 409,419 ----
  	if (new_size >= list_length(list))
  		return list;
  
+ 	/*
+ 	 * NOTE: This may technically leak some pre-alloc'd ListCells,
+ 	 * but running down those that will be would defeat this being
+ 	 * a 'list_truncate' instead of a 'list_delete'.
+ 	 */
  	n = 1;
  	foreach(cell, list)
  	{
***************
*** 555,561 **** list_delete_cell(List *list, ListCell *cell, ListCell *prev)
  	if (list->tail == cell)
  		list->tail = prev;
  
! 	pfree(cell);
  	return list;
  }
  
--- 610,621 ----
  	if (list->tail == cell)
  		list->tail = prev;
  
! 	/* Check if this cell was pre-alloc'd; if so, mark its slot as available */
! 	if (cell >= list->base && cell <= (list->base + LIST_PREALLOC-1))
! 		list->avail_bitmap |= 1 << (cell - list->base);
! 	else
! 		pfree(cell);
! 
  	return list;
  }
  
***************
*** 1088,1094 **** list_free_private(List *list, bool deep)
  		cell = lnext(cell);
  		if (deep)
  			pfree(lfirst(tmp));
! 		pfree(tmp);
  	}
  
  	if (list)
--- 1148,1158 ----
  		cell = lnext(cell);
  		if (deep)
  			pfree(lfirst(tmp));
! 
! 		if (tmp >= list->base && tmp <= (list->base + LIST_PREALLOC-1))
! 			list->avail_bitmap &= 1 << (tmp - list->base);
! 		else
! 			pfree(tmp);
  	}
  
  	if (list)
***************
*** 1153,1161 **** list_copy(List *oldlist)
  	oldlist_cur = oldlist->head->next;
  	while (oldlist_cur)
  	{
  		ListCell   *newlist_cur;
  
! 		newlist_cur = (ListCell *) palloc(sizeof(*newlist_cur));
  		newlist_cur->data = oldlist_cur->data;
  		newlist_prev->next = newlist_cur;
  
--- 1217,1233 ----
  	oldlist_cur = oldlist->head->next;
  	while (oldlist_cur)
  	{
+ 		int         free_spot = ffs(newlist->avail_bitmap);
  		ListCell   *newlist_cur;
  
! 		if (free_spot && free_spot <= LIST_PREALLOC)
! 		{
! 			newlist_cur = newlist->base + --free_spot;
! 			newlist->avail_bitmap &= ~(1 << free_spot);
! 		}
! 		else
! 			newlist_cur = (ListCell *) palloc(sizeof(*newlist_cur));
! 
  		newlist_cur->data = oldlist_cur->data;
  		newlist_prev->next = newlist_cur;
  
***************
*** 1206,1214 **** list_copy_tail(List *oldlist, int nskip)
  	oldlist_cur = oldlist_cur->next;
  	while (oldlist_cur)
  	{
  		ListCell   *newlist_cur;
  
! 		newlist_cur = (ListCell *) palloc(sizeof(*newlist_cur));
  		newlist_cur->data = oldlist_cur->data;
  		newlist_prev->next = newlist_cur;
  
--- 1278,1294 ----
  	oldlist_cur = oldlist_cur->next;
  	while (oldlist_cur)
  	{
+ 		int         free_spot = ffs(newlist->avail_bitmap);
  		ListCell   *newlist_cur;
  
! 		if (free_spot && free_spot <= LIST_PREALLOC)
! 		{
! 			newlist_cur = newlist->base + --free_spot;
! 			newlist->avail_bitmap &= ~(1 << free_spot);
! 		}
! 		else
! 			newlist_cur = (ListCell *) palloc(sizeof(*newlist_cur));
! 
  		newlist_cur->data = oldlist_cur->data;
  		newlist_prev->next = newlist_cur;
  
*** a/src/backend/optimizer/util/clauses.c
--- b/src/backend/optimizer/util/clauses.c
***************
*** 3192,3198 **** simplify_or_arguments(List *args,
  				List	   *oldhdr = unprocessed_args;
  
  				unprocessed_args = list_concat(subargs, unprocessed_args);
! 				pfree(oldhdr);
  			}
  			continue;
  		}
--- 3192,3198 ----
  				List	   *oldhdr = unprocessed_args;
  
  				unprocessed_args = list_concat(subargs, unprocessed_args);
! 				/* pfree(oldhdr); */
  			}
  			continue;
  		}
***************
*** 3294,3300 **** simplify_and_arguments(List *args,
  				List	   *oldhdr = unprocessed_args;
  
  				unprocessed_args = list_concat(subargs, unprocessed_args);
! 				pfree(oldhdr);
  			}
  			continue;
  		}
--- 3294,3300 ----
  				List	   *oldhdr = unprocessed_args;
  
  				unprocessed_args = list_concat(subargs, unprocessed_args);
! 				/* pfree(oldhdr); */
  			}
  			continue;
  		}
*** a/src/backend/parser/parse_clause.c
--- b/src/backend/parser/parse_clause.c
***************
*** 783,789 **** transformFromClauseItem(ParseState *pstate, Node *n,
  		my_relnamespace = list_concat(l_relnamespace, r_relnamespace);
  		my_containedRels = bms_join(l_containedRels, r_containedRels);
  
! 		pfree(r_relnamespace);	/* free unneeded list header */
  
  		/*
  		 * Extract column name and var lists from both subtrees
--- 783,789 ----
  		my_relnamespace = list_concat(l_relnamespace, r_relnamespace);
  		my_containedRels = bms_join(l_containedRels, r_containedRels);
  
! 		/* pfree(r_relnamespace); */	/* free unneeded list header */
  
  		/*
  		 * Extract column name and var lists from both subtrees
*** a/src/include/nodes/pg_list.h
--- b/src/include/nodes/pg_list.h
***************
*** 39,55 ****
  
  #include "nodes/nodes.h"
  
  
  typedef struct ListCell ListCell;
  
- typedef struct List
- {
- 	NodeTag		type;			/* T_List, T_IntList, or T_OidList */
- 	int			length;
- 	ListCell   *head;
- 	ListCell   *tail;
- } List;
- 
  struct ListCell
  {
  	union
--- 39,49 ----
  
  #include "nodes/nodes.h"
  
+ /* changing this would require changing the used bitmap */
+ #define LIST_PREALLOC 8
  
  typedef struct ListCell ListCell;
  
  struct ListCell
  {
  	union
***************
*** 61,66 **** struct ListCell
--- 55,72 ----
  	ListCell   *next;
  };
  
+ typedef struct List
+ {
+ 	NodeTag		type;			/* T_List, T_IntList, or T_OidList */
+ 	int			length;
+ 	ListCell   *head;
+ 	ListCell   *tail;
+ 
+ 	/* pre-allocated ListCells */
+ 	int 		avail_bitmap;	/* bitmap of available prealloc'd spots */
+ 	ListCell    base[LIST_PREALLOC];
+ } List;
+ 
  /*
   * The *only* valid representation of an empty list is NIL; in other
   * words, a non-NIL list is guaranteed to have length >= 1 and

Attachment: signature.asc
Description: Digital signature

Reply via email to