Rebased to correct for pg_indent changes.
 
Applies cleanly.
Compiles cleanly.
Passes regression tests.
Comments and format look good.
No documentation changes needed.
No regression test changes needed.
 
Performance tests to follow in a day or two.
 
-Kevin
Index: src/bin/pg_dump/pg_backup_archiver.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.172
diff -c -d -r1.172 pg_backup_archiver.c
*** src/bin/pg_dump/pg_backup_archiver.c        11 Jun 2009 14:49:07 -0000      
1.172
--- src/bin/pg_dump/pg_backup_archiver.c        17 Jul 2009 02:20:28 -0000
***************
*** 59,70 ****
--- 59,80 ----
  #define thandle HANDLE
  #endif
  
+ /* List header for pending-activity lists */
+ typedef struct
+ {
+       TocEntry   *head;
+       TocEntry   *tail;
+       /* The list link fields in each TocEntry are par_prev and par_next */
+ } TocEntryList;
+ 
+ /* Arguments needed for a worker child */
  typedef struct _restore_args
  {
        ArchiveHandle *AH;
        TocEntry   *te;
  } RestoreArgs;
  
+ /* State for each parallel activity slot */
  typedef struct _parallel_slot
  {
        thandle         child_id;
***************
*** 117,124 ****
  static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
  static bool work_in_progress(ParallelSlot *slots, int n_slots);
  static int    get_next_slot(ParallelSlot *slots, int n_slots);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
!                                  TocEntry **first_unprocessed,
                                   ParallelSlot *slots, int n_slots);
  static parallel_restore_result parallel_restore(RestoreArgs *args);
  static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
--- 127,138 ----
  static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
  static bool work_in_progress(ParallelSlot *slots, int n_slots);
  static int    get_next_slot(ParallelSlot *slots, int n_slots);
+ static void toc_list_append(TocEntryList *l, TocEntry *te);
+ static void toc_list_remove(TocEntryList *l, TocEntry *te);
+ static void find_ready_items(TocEntryList *pending_list,
+                                                        TocEntryList 
*ready_list);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
!                                  TocEntryList *ready_list,
                                   ParallelSlot *slots, int n_slots);
  static parallel_restore_result parallel_restore(RestoreArgs *args);
  static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
***************
*** 3065,3071 ****
        ParallelSlot *slots;
        int                     work_status;
        int                     next_slot;
!       TocEntry   *first_unprocessed = AH->toc->next;
        TocEntry   *next_work_item;
        thandle         ret_child;
        TocEntry   *te;
--- 3079,3086 ----
        ParallelSlot *slots;
        int                     work_status;
        int                     next_slot;
!       TocEntryList pending_list;
!       TocEntryList ready_list;
        TocEntry   *next_work_item;
        thandle         ret_child;
        TocEntry   *te;
***************
*** 3087,3094 ****
         * faster in a single connection because we avoid all the connection and
         * setup overhead.
         */
!       while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
!                                                                               
                NULL, 0)) != NULL)
        {
                if (next_work_item->section == SECTION_DATA ||
                        next_work_item->section == SECTION_POST_DATA)
--- 3102,3108 ----
         * faster in a single connection because we avoid all the connection and
         * setup overhead.
         */
!       for (next_work_item = AH->toc->next; next_work_item != AH->toc; 
next_work_item = next_work_item->next)
        {
                if (next_work_item->section == SECTION_DATA ||
                        next_work_item->section == SECTION_POST_DATA)
***************
*** 3100,3106 ****
  
                (void) restore_toc_entry(AH, next_work_item, ropt, false);
  
-               next_work_item->restored = true;
                reduce_dependencies(AH, next_work_item);
        }
  
--- 3114,3119 ----
***************
*** 3125,3130 ****
--- 3138,3162 ----
        AH->currWithOids = -1;
  
        /*
+        * Initialize the lists of pending and ready items.  After this setup,
+        * the pending list is everything that needs to be done but is blocked
+        * by one or more dependencies, while the ready list contains items that
+        * have no remaining dependencies.  Note: we don't yet filter out 
entries
+        * that aren't going to be restored.  They might participate in
+        * dependency chains connecting entries that should be restored, so we
+        * treat them as live until we actually process them.
+        */
+       pending_list.head = pending_list.tail = NULL;
+       ready_list.head = ready_list.tail = NULL;
+       for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
+       {
+               if (next_work_item->depCount > 0)
+                       toc_list_append(&pending_list, next_work_item);
+               else
+                       toc_list_append(&ready_list, next_work_item);
+       }
+ 
+       /*
         * main parent loop
         *
         * Keep going until there is no worker still running AND there is no 
work
***************
*** 3133,3139 ****
  
        ahlog(AH, 1, "entering main parallel loop\n");
  
!       while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
                                                                                
                slots, n_slots)) != NULL ||
                   work_in_progress(slots, n_slots))
        {
--- 3165,3171 ----
  
        ahlog(AH, 1, "entering main parallel loop\n");
  
!       while ((next_work_item = get_next_work_item(AH, &ready_list,
                                                                                
                slots, n_slots)) != NULL ||
                   work_in_progress(slots, n_slots))
        {
***************
*** 3149,3156 ****
                                          next_work_item->dumpId,
                                          next_work_item->desc, 
next_work_item->tag);
  
!                               next_work_item->restored = true;
                                reduce_dependencies(AH, next_work_item);
  
                                continue;
                        }
--- 3181,3189 ----
                                          next_work_item->dumpId,
                                          next_work_item->desc, 
next_work_item->tag);
  
!                               toc_list_remove(&ready_list, next_work_item);
                                reduce_dependencies(AH, next_work_item);
+                               find_ready_items(&pending_list, &ready_list);
  
                                continue;
                        }
***************
*** 3165,3171 ****
                                          next_work_item->dumpId,
                                          next_work_item->desc, 
next_work_item->tag);
  
!                               next_work_item->restored = true;
  
                                /* this memory is dealloced in mark_work_done() 
*/
                                args = malloc(sizeof(RestoreArgs));
--- 3198,3204 ----
                                          next_work_item->dumpId,
                                          next_work_item->desc, 
next_work_item->tag);
  
!                               toc_list_remove(&ready_list, next_work_item);
  
                                /* this memory is dealloced in mark_work_done() 
*/
                                args = malloc(sizeof(RestoreArgs));
***************
*** 3194,3199 ****
--- 3227,3233 ----
                {
                        mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
                                                   slots, n_slots);
+                       find_ready_items(&pending_list, &ready_list);
                }
                else
                {
***************
*** 3218,3231 ****
         * dependencies, or some other pathological condition. If so, do it in 
the
         * single parent connection.
         */
!       for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
!               if (!te->restored)
!               {
!                       ahlog(AH, 1, "processing missed item %d %s %s\n",
!                                 te->dumpId, te->desc, te->tag);
!                       (void) restore_toc_entry(AH, te, ropt, false);
!               }
        }
  
        /* The ACLs will be handled back in RestoreArchive. */
--- 3252,3262 ----
         * dependencies, or some other pathological condition. If so, do it in 
the
         * single parent connection.
         */
!       for (te = pending_list.head; te; te = te->par_next)
        {
!               ahlog(AH, 1, "processing missed item %d %s %s\n",
!                         te->dumpId, te->desc, te->tag);
!               (void) restore_toc_entry(AH, te, ropt, false);
        }
  
        /* The ACLs will be handled back in RestoreArchive. */
***************
*** 3366,3392 ****
        }
        return false;
  }
  
  
  
  /*
   * Find the next work item (if any) that is capable of being run now.
   *
   * To qualify, the item must have no remaining dependencies
!  * and no requirement for locks that is incompatible with
!  * items currently running.
   *
!  * first_unprocessed is state data that tracks the location of the first
!  * TocEntry that's not marked 'restored'.  This avoids O(N^2) search time
!  * with long TOC lists.  (Even though the constant is pretty small, it'd
!  * get us eventually.)
   *
   * pref_non_data is for an alternative selection algorithm that gives
   * preference to non-data items if there is already a data load running.
   * It is currently disabled.
   */
  static TocEntry *
! get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
                                   ParallelSlot *slots, int n_slots)
  {
        bool            pref_non_data = false;  /* or get from AH->ropt */
--- 3397,3474 ----
        }
        return false;
  }
+   
+   
+ /* Append te to the end of the TocEntryList */
+ static void
+ toc_list_append(TocEntryList *l, TocEntry *te)
+ {
+       if (l->tail)
+               l->tail->par_next = te;
+       te->par_prev = l->tail;
+       te->par_next = NULL;
+       l->tail = te;
+       if (l->head == NULL)
+               l->head = te;
+ }
+ 
+ /* Remove te from the TocEntryList */
+ static void
+ toc_list_remove(TocEntryList *l, TocEntry *te)
+ {
+       if (te->par_prev)
+               te->par_prev->par_next = te->par_next;
+       else
+               l->head = te->par_next;
+       if (te->par_next)
+               te->par_next->par_prev = te->par_prev;
+       else
+               l->tail = te->par_prev;
+       te->par_next = NULL;
+       te->par_prev = NULL;
+ }
+ 
  
+ /*
+  * Find pending items that have no remaining dependencies, and move them
+  * to the ready list.
+  */
+ static void
+ find_ready_items(TocEntryList *pending_list, TocEntryList *ready_list)
+ {
+       TocEntry *te;
+       TocEntry *next;
  
+       for (te = pending_list->head; te; te = next)
+       {
+               /* Save list link in case we move this item to the other list */
+               next = te->par_next;
+               if (te->depCount > 0)
+                       continue;                       /* not ready yet */
+               /* OK, move it */
+               toc_list_remove(pending_list, te);
+               toc_list_append(ready_list, te);
+       }
+ }
  
+   
  /*
   * Find the next work item (if any) that is capable of being run now.
   *
   * To qualify, the item must have no remaining dependencies
!  * and no requirements for locks that are incompatible with
!  * items currently running.  Items in the ready_list are known to have
!  * no remaining dependencies, but we have to check for lock conflicts.
   *
!  * Note that the returned item has *not* been removed from ready_list.
!  * The caller must do that after successfully dispatching the item.
   *
   * pref_non_data is for an alternative selection algorithm that gives
   * preference to non-data items if there is already a data load running.
   * It is currently disabled.
   */
  static TocEntry *
! get_next_work_item(ArchiveHandle *AH, TocEntryList *ready_list,
                                   ParallelSlot *slots, int n_slots)
  {
        bool            pref_non_data = false;  /* or get from AH->ropt */
***************
*** 3411,3436 ****
        }
  
        /*
!        * Advance first_unprocessed if possible.
!        */
!       for (te = *first_unprocessed; te != AH->toc; te = te->next)
!       {
!               if (!te->restored)
!                       break;
!       }
!       *first_unprocessed = te;
! 
!       /*
!        * Search from first_unprocessed until we find an available item.
         */
!       for (; te != AH->toc; te = te->next)
        {
                bool            conflicts = false;
  
-               /* Ignore if already done or still waiting on dependencies */
-               if (te->restored || te->depCount > 0)
-                       continue;
- 
                /*
                 * Check to see if the item would need exclusive lock on 
something
                 * that a currently running item also needs lock on, or vice 
versa. If
--- 3493,3504 ----
        }
  
        /*
!        * Search the ready_list until we find a suitable item.
         */
!       for (te = ready_list->head; te; te = te->par_next)
        {
                bool            conflicts = false;
  
                /*
                 * Check to see if the item would need exclusive lock on 
something
                 * that a currently running item also needs lock on, or vice 
versa. If
Index: src/bin/pg_dump/pg_backup_archiver.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.79
diff -c -d -r1.79 pg_backup_archiver.h
*** src/bin/pg_dump/pg_backup_archiver.h        11 Jun 2009 14:49:07 -0000      
1.79
--- src/bin/pg_dump/pg_backup_archiver.h        17 Jul 2009 02:20:28 -0000
***************
*** 314,320 ****
        void       *formatData;         /* TOC Entry data specific to file 
format */
  
        /* working state (needed only for parallel restore) */
!       bool            restored;               /* item is in progress or done 
*/
        bool            created;                /* set for DATA member if TABLE 
was created */
        int                     depCount;               /* number of 
dependencies not yet restored */
        DumpId     *lockDeps;           /* dumpIds of objects this one needs 
lock on */
--- 314,321 ----
        void       *formatData;         /* TOC Entry data specific to file 
format */
  
        /* working state (needed only for parallel restore) */
!       struct _tocEntry *par_prev;     /* list links for pending items */
!       struct _tocEntry *par_next;
        bool            created;                /* set for DATA member if TABLE 
was created */
        int                     depCount;               /* number of 
dependencies not yet restored */
        DumpId     *lockDeps;           /* dumpIds of objects this one needs 
lock on */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to