Rebased to correct for pg_indent changes.
Applies cleanly.
Compiles cleanly.
Passes regression tests.
Comments and format look good.
No documentation changes needed.
No regression test changes needed.
Performance tests to follow in a day or two.
-Kevin
Index: src/bin/pg_dump/pg_backup_archiver.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.172
diff -c -d -r1.172 pg_backup_archiver.c
*** src/bin/pg_dump/pg_backup_archiver.c 11 Jun 2009 14:49:07 -0000
1.172
--- src/bin/pg_dump/pg_backup_archiver.c 17 Jul 2009 02:20:28 -0000
***************
*** 59,70 ****
--- 59,80 ----
#define thandle HANDLE
#endif
+ /* List header for pending-activity lists */
+ typedef struct
+ {
+ TocEntry *head;
+ TocEntry *tail;
+ /* The list link fields in each TocEntry are par_prev and par_next */
+ } TocEntryList;
+
+ /* Arguments needed for a worker child */
typedef struct _restore_args
{
ArchiveHandle *AH;
TocEntry *te;
} RestoreArgs;
+ /* State for each parallel activity slot */
typedef struct _parallel_slot
{
thandle child_id;
***************
*** 117,124 ****
static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
static bool work_in_progress(ParallelSlot *slots, int n_slots);
static int get_next_slot(ParallelSlot *slots, int n_slots);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
! TocEntry **first_unprocessed,
ParallelSlot *slots, int n_slots);
static parallel_restore_result parallel_restore(RestoreArgs *args);
static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
--- 127,138 ----
static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
static bool work_in_progress(ParallelSlot *slots, int n_slots);
static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static void toc_list_append(TocEntryList *l, TocEntry *te);
+ static void toc_list_remove(TocEntryList *l, TocEntry *te);
+ static void find_ready_items(TocEntryList *pending_list,
+ TocEntryList
*ready_list);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
! TocEntryList *ready_list,
ParallelSlot *slots, int n_slots);
static parallel_restore_result parallel_restore(RestoreArgs *args);
static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
***************
*** 3065,3071 ****
ParallelSlot *slots;
int work_status;
int next_slot;
! TocEntry *first_unprocessed = AH->toc->next;
TocEntry *next_work_item;
thandle ret_child;
TocEntry *te;
--- 3079,3086 ----
ParallelSlot *slots;
int work_status;
int next_slot;
! TocEntryList pending_list;
! TocEntryList ready_list;
TocEntry *next_work_item;
thandle ret_child;
TocEntry *te;
***************
*** 3087,3094 ****
* faster in a single connection because we avoid all the connection and
* setup overhead.
*/
! while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
!
NULL, 0)) != NULL)
{
if (next_work_item->section == SECTION_DATA ||
next_work_item->section == SECTION_POST_DATA)
--- 3102,3108 ----
* faster in a single connection because we avoid all the connection and
* setup overhead.
*/
! for (next_work_item = AH->toc->next; next_work_item != AH->toc;
next_work_item = next_work_item->next)
{
if (next_work_item->section == SECTION_DATA ||
next_work_item->section == SECTION_POST_DATA)
***************
*** 3100,3106 ****
(void) restore_toc_entry(AH, next_work_item, ropt, false);
- next_work_item->restored = true;
reduce_dependencies(AH, next_work_item);
}
--- 3114,3119 ----
***************
*** 3125,3130 ****
--- 3138,3162 ----
AH->currWithOids = -1;
/*
+ * Initialize the lists of pending and ready items. After this setup,
+ * the pending list is everything that needs to be done but is blocked
+ * by one or more dependencies, while the ready list contains items that
+ * have no remaining dependencies. Note: we don't yet filter out
entries
+ * that aren't going to be restored. They might participate in
+ * dependency chains connecting entries that should be restored, so we
+ * treat them as live until we actually process them.
+ */
+ pending_list.head = pending_list.tail = NULL;
+ ready_list.head = ready_list.tail = NULL;
+ for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
+ {
+ if (next_work_item->depCount > 0)
+ toc_list_append(&pending_list, next_work_item);
+ else
+ toc_list_append(&ready_list, next_work_item);
+ }
+
+ /*
* main parent loop
*
* Keep going until there is no worker still running AND there is no
work
***************
*** 3133,3139 ****
ahlog(AH, 1, "entering main parallel loop\n");
! while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
slots, n_slots)) != NULL ||
work_in_progress(slots, n_slots))
{
--- 3165,3171 ----
ahlog(AH, 1, "entering main parallel loop\n");
! while ((next_work_item = get_next_work_item(AH, &ready_list,
slots, n_slots)) != NULL ||
work_in_progress(slots, n_slots))
{
***************
*** 3149,3156 ****
next_work_item->dumpId,
next_work_item->desc,
next_work_item->tag);
! next_work_item->restored = true;
reduce_dependencies(AH, next_work_item);
continue;
}
--- 3181,3189 ----
next_work_item->dumpId,
next_work_item->desc,
next_work_item->tag);
! toc_list_remove(&ready_list, next_work_item);
reduce_dependencies(AH, next_work_item);
+ find_ready_items(&pending_list, &ready_list);
continue;
}
***************
*** 3165,3171 ****
next_work_item->dumpId,
next_work_item->desc,
next_work_item->tag);
! next_work_item->restored = true;
/* this memory is dealloced in mark_work_done()
*/
args = malloc(sizeof(RestoreArgs));
--- 3198,3204 ----
next_work_item->dumpId,
next_work_item->desc,
next_work_item->tag);
! toc_list_remove(&ready_list, next_work_item);
/* this memory is dealloced in mark_work_done()
*/
args = malloc(sizeof(RestoreArgs));
***************
*** 3194,3199 ****
--- 3227,3233 ----
{
mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
slots, n_slots);
+ find_ready_items(&pending_list, &ready_list);
}
else
{
***************
*** 3218,3231 ****
* dependencies, or some other pathological condition. If so, do it in
the
* single parent connection.
*/
! for (te = AH->toc->next; te != AH->toc; te = te->next)
{
! if (!te->restored)
! {
! ahlog(AH, 1, "processing missed item %d %s %s\n",
! te->dumpId, te->desc, te->tag);
! (void) restore_toc_entry(AH, te, ropt, false);
! }
}
/* The ACLs will be handled back in RestoreArchive. */
--- 3252,3262 ----
* dependencies, or some other pathological condition. If so, do it in
the
* single parent connection.
*/
! for (te = pending_list.head; te; te = te->par_next)
{
! ahlog(AH, 1, "processing missed item %d %s %s\n",
! te->dumpId, te->desc, te->tag);
! (void) restore_toc_entry(AH, te, ropt, false);
}
/* The ACLs will be handled back in RestoreArchive. */
***************
*** 3366,3392 ****
}
return false;
}
/*
* Find the next work item (if any) that is capable of being run now.
*
* To qualify, the item must have no remaining dependencies
! * and no requirement for locks that is incompatible with
! * items currently running.
*
! * first_unprocessed is state data that tracks the location of the first
! * TocEntry that's not marked 'restored'. This avoids O(N^2) search time
! * with long TOC lists. (Even though the constant is pretty small, it'd
! * get us eventually.)
*
* pref_non_data is for an alternative selection algorithm that gives
* preference to non-data items if there is already a data load running.
* It is currently disabled.
*/
static TocEntry *
! get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
ParallelSlot *slots, int n_slots)
{
bool pref_non_data = false; /* or get from AH->ropt */
--- 3397,3474 ----
}
return false;
}
+
+
+ /* Append te to the end of the TocEntryList */
+ static void
+ toc_list_append(TocEntryList *l, TocEntry *te)
+ {
+ if (l->tail)
+ l->tail->par_next = te;
+ te->par_prev = l->tail;
+ te->par_next = NULL;
+ l->tail = te;
+ if (l->head == NULL)
+ l->head = te;
+ }
+
+ /* Remove te from the TocEntryList */
+ static void
+ toc_list_remove(TocEntryList *l, TocEntry *te)
+ {
+ if (te->par_prev)
+ te->par_prev->par_next = te->par_next;
+ else
+ l->head = te->par_next;
+ if (te->par_next)
+ te->par_next->par_prev = te->par_prev;
+ else
+ l->tail = te->par_prev;
+ te->par_next = NULL;
+ te->par_prev = NULL;
+ }
+
+ /*
+ * Find pending items that have no remaining dependencies, and move them
+ * to the ready list.
+ */
+ static void
+ find_ready_items(TocEntryList *pending_list, TocEntryList *ready_list)
+ {
+ TocEntry *te;
+ TocEntry *next;
+ for (te = pending_list->head; te; te = next)
+ {
+ /* Save list link in case we move this item to the other list */
+ next = te->par_next;
+ if (te->depCount > 0)
+ continue; /* not ready yet */
+ /* OK, move it */
+ toc_list_remove(pending_list, te);
+ toc_list_append(ready_list, te);
+ }
+ }
+
/*
* Find the next work item (if any) that is capable of being run now.
*
* To qualify, the item must have no remaining dependencies
! * and no requirements for locks that are incompatible with
! * items currently running. Items in the ready_list are known to have
! * no remaining dependencies, but we have to check for lock conflicts.
*
! * Note that the returned item has *not* been removed from ready_list.
! * The caller must do that after successfully dispatching the item.
*
* pref_non_data is for an alternative selection algorithm that gives
* preference to non-data items if there is already a data load running.
* It is currently disabled.
*/
static TocEntry *
! get_next_work_item(ArchiveHandle *AH, TocEntryList *ready_list,
ParallelSlot *slots, int n_slots)
{
bool pref_non_data = false; /* or get from AH->ropt */
***************
*** 3411,3436 ****
}
/*
! * Advance first_unprocessed if possible.
! */
! for (te = *first_unprocessed; te != AH->toc; te = te->next)
! {
! if (!te->restored)
! break;
! }
! *first_unprocessed = te;
!
! /*
! * Search from first_unprocessed until we find an available item.
*/
! for (; te != AH->toc; te = te->next)
{
bool conflicts = false;
- /* Ignore if already done or still waiting on dependencies */
- if (te->restored || te->depCount > 0)
- continue;
-
/*
* Check to see if the item would need exclusive lock on
something
* that a currently running item also needs lock on, or vice
versa. If
--- 3493,3504 ----
}
/*
! * Search the ready_list until we find a suitable item.
*/
! for (te = ready_list->head; te; te = te->par_next)
{
bool conflicts = false;
/*
* Check to see if the item would need exclusive lock on
something
* that a currently running item also needs lock on, or vice
versa. If
Index: src/bin/pg_dump/pg_backup_archiver.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.79
diff -c -d -r1.79 pg_backup_archiver.h
*** src/bin/pg_dump/pg_backup_archiver.h 11 Jun 2009 14:49:07 -0000
1.79
--- src/bin/pg_dump/pg_backup_archiver.h 17 Jul 2009 02:20:28 -0000
***************
*** 314,320 ****
void *formatData; /* TOC Entry data specific to file
format */
/* working state (needed only for parallel restore) */
! bool restored; /* item is in progress or done
*/
bool created; /* set for DATA member if TABLE
was created */
int depCount; /* number of
dependencies not yet restored */
DumpId *lockDeps; /* dumpIds of objects this one needs
lock on */
--- 314,321 ----
void *formatData; /* TOC Entry data specific to file
format */
/* working state (needed only for parallel restore) */
! struct _tocEntry *par_prev; /* list links for pending items */
! struct _tocEntry *par_next;
bool created; /* set for DATA member if TABLE
was created */
int depCount; /* number of
dependencies not yet restored */
DumpId *lockDeps; /* dumpIds of objects this one needs
lock on */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers