Signed-off-by: Ben Peart <benpe...@microsoft.com>
---
 cache.h        |   1 +
 config.c       |   5 +
 environment.c  |   1 +
 unpack-trees.c | 313 ++++++++++++++++++++++++++++++++++++++++++++++++-
 unpack-trees.h |  30 +++++
 5 files changed, 348 insertions(+), 2 deletions(-)

diff --git a/cache.h b/cache.h
index d49092d94d..4bfa35c497 100644
--- a/cache.h
+++ b/cache.h
@@ -815,6 +815,7 @@ extern int fsync_object_files;
 extern int core_preload_index;
 extern int core_commit_graph;
 extern int core_apply_sparse_checkout;
+extern int core_parallel_unpack_trees;
 extern int precomposed_unicode;
 extern int protect_hfs;
 extern int protect_ntfs;
diff --git a/config.c b/config.c
index f4a208a166..34d5506588 100644
--- a/config.c
+++ b/config.c
@@ -1346,6 +1346,11 @@ static int git_default_core_config(const char *var, 
const char *value)
                                         var, value);
        }
 
+       if (!strcmp(var, "core.parallelunpacktrees")) {
+               core_parallel_unpack_trees = git_config_bool(var, value);
+               return 0;
+       }
+
        /* Add other config variables here and to Documentation/config.txt. */
        return 0;
 }
diff --git a/environment.c b/environment.c
index 2a6de2330b..1eb0a05074 100644
--- a/environment.c
+++ b/environment.c
@@ -68,6 +68,7 @@ char *notes_ref_name;
 int grafts_replace_parents = 1;
 int core_commit_graph;
 int core_apply_sparse_checkout;
+int core_parallel_unpack_trees;
 int merge_log_config = -1;
 int precomposed_unicode = -1; /* see probe_utf8_pathname_composition() */
 unsigned long pack_size_limit_cfg;
diff --git a/unpack-trees.c b/unpack-trees.c
index 1f58efc6bb..2333626efd 100644
--- a/unpack-trees.c
+++ b/unpack-trees.c
@@ -17,6 +17,7 @@
 #include "submodule-config.h"
 #include "fsmonitor.h"
 #include "fetch-object.h"
+#include "thread-utils.h"
 
 /*
  * Error messages expected by scripts out of plumbing commands such as
@@ -641,6 +642,98 @@ static inline int are_same_oid(struct name_entry *name_j, 
struct name_entry *nam
        return name_j->oid && name_k->oid && !oidcmp(name_j->oid, name_k->oid);
 }
 
+#ifndef NO_PTHREADS
+
+struct traverse_info_parallel {
+       struct mpmcq_entry entry;
+       struct tree_desc t[MAX_UNPACK_TREES];
+       void *buf[MAX_UNPACK_TREES];
+       struct traverse_info info;
+       int n;
+       int nr_buf;
+       int ret;
+};
+
+static int traverse_trees_parallel(int n, unsigned long dirmask,
+                                  unsigned long df_conflicts,
+                                  struct name_entry *names,
+                                  struct traverse_info *info)
+{
+       int i;
+       struct name_entry *p;
+       struct unpack_trees_options *o = info->data;
+       struct traverse_info_parallel *newinfo;
+
+       p = names;
+       while (!p->mode)
+               p++;
+
+       newinfo = xmalloc(sizeof(struct traverse_info_parallel));
+       mpmcq_entry_init(&newinfo->entry);
+       newinfo->info = *info;
+       newinfo->info.prev = info;
+       newinfo->info.pathspec = info->pathspec;
+       newinfo->info.name = *p;
+       newinfo->info.pathlen += tree_entry_len(p) + 1;
+       newinfo->info.df_conflicts |= df_conflicts;
+       newinfo->nr_buf = 0;
+       newinfo->n = n;
+
+       /*
+        * Fetch the tree from the ODB for each peer directory in the
+        * n commits.
+        *
+        * For 2- and 3-way traversals, we try to avoid hitting the
+        * ODB twice for the same OID.  This should yield a nice speed
+        * up in checkouts and merges when the commits are similar.
+        *
+        * We don't bother doing the full O(n^2) search for larger n,
+        * because wider traversals don't happen that often and we
+        * avoid the search setup.
+        *
+        * When 2 peer OIDs are the same, we just copy the tree
+        * descriptor data.  This implicitly borrows the buffer
+        * data from the earlier cell.
+        */
+       for (i = 0; i < n; i++, dirmask >>= 1) {
+               if (i > 0 && are_same_oid(&names[i], &names[i - 1]))
+                       newinfo->t[i] = newinfo->t[i - 1];
+               else if (i > 1 && are_same_oid(&names[i], &names[i - 2]))
+                       newinfo->t[i] = newinfo->t[i - 2];
+               else {
+                       const struct object_id *oid = NULL;
+                       if (dirmask & 1)
+                               oid = names[i].oid;
+
+                       /*
+                        * fill_tree_descriptor() will load the tree from the
+                        * ODB. Accessing the ODB is not thread safe so
+                        * serialize access using the odb_mutex.
+                        */
+                       pthread_mutex_lock(&o->odb_mutex);
+                       newinfo->buf[newinfo->nr_buf++] =
+                               fill_tree_descriptor(newinfo->t + i, oid);
+                       pthread_mutex_unlock(&o->odb_mutex);
+               }
+       }
+
+       /*
+        * We can't play games with the cache bottom as we are processing
+        * the tree objects in parallel.
+        * newinfo->bottom = switch_cache_bottom(&newinfo->info);
+        */
+
+       /* All I really need here is fetch_and_add() */
+       pthread_mutex_lock(&o->work_mutex);
+       o->remaining_work++;
+       pthread_mutex_unlock(&o->work_mutex);
+       mpmcq_push(&o->queue, &newinfo->entry);
+
+       return 0;
+}
+
+#endif
+
 static int traverse_trees_recursive(int n, unsigned long dirmask,
                                    unsigned long df_conflicts,
                                    struct name_entry *names,
@@ -995,6 +1088,108 @@ static void debug_unpack_callback(int n,
                debug_name_entry(i, names + i);
 }
 
+static int unpack_callback_parallel(int n, unsigned long mask,
+                                   unsigned long dirmask,
+                                   struct name_entry *names,
+                                   struct traverse_info *info)
+{
+       struct cache_entry *src[MAX_UNPACK_TREES + 1] = {
+               NULL,
+       };
+       struct unpack_trees_options *o = info->data;
+       const struct name_entry *p = names;
+
+       /* Find first entry with a real name (we could use "mask" too) */
+       while (!p->mode)
+               p++;
+
+       if (o->debug_unpack)
+               debug_unpack_callback(n, mask, dirmask, names, info);
+
+       /* Are we supposed to look at the index too? */
+       if (o->merge) {
+               while (1) {
+                       int cmp;
+                       struct cache_entry *ce;
+
+                       if (o->diff_index_cached)
+                               ce = next_cache_entry(o);
+                       else
+                               ce = find_cache_entry(info, p);
+
+                       if (!ce)
+                               break;
+                       cmp = compare_entry(ce, info, p);
+                       if (cmp < 0) {
+                               int ret;
+
+                               
pthread_mutex_lock(&o->unpack_index_entry_mutex);
+                               ret = unpack_index_entry(ce, o);
+                               
pthread_mutex_unlock(&o->unpack_index_entry_mutex);
+                               if (ret < 0)
+                                       return unpack_failed(o, NULL);
+                               continue;
+                       }
+                       if (!cmp) {
+                               if (ce_stage(ce)) {
+                                       /*
+                                        * If we skip unmerged index
+                                        * entries, we'll skip this
+                                        * entry *and* the tree
+                                        * entries associated with it!
+                                        */
+                                       if (o->skip_unmerged) {
+                                               add_same_unmerged(ce, o);
+                                               return mask;
+                                       }
+                               }
+                               src[0] = ce;
+                       }
+                       break;
+               }
+       }
+
+       pthread_mutex_lock(&o->unpack_nondirectories_mutex);
+       int ret = unpack_nondirectories(n, mask, dirmask, src, names, info);
+       pthread_mutex_unlock(&o->unpack_nondirectories_mutex);
+       if (ret < 0)
+               return -1;
+
+       if (o->merge && src[0]) {
+               if (ce_stage(src[0]))
+                       mark_ce_used_same_name(src[0], o);
+               else
+                       mark_ce_used(src[0], o);
+       }
+
+       /* Now handle any directories.. */
+       if (dirmask) {
+               /* special case: "diff-index --cached" looking at a tree */
+               if (o->diff_index_cached && n == 1 && dirmask == 1 &&
+                   S_ISDIR(names->mode)) {
+                       int matches;
+                       matches = cache_tree_matches_traversal(
+                               o->src_index->cache_tree, names, info);
+                       /*
+                        * Everything under the name matches; skip the
+                        * entire hierarchy.  diff_index_cached codepath
+                        * special cases D/F conflicts in such a way that
+                        * it does not do any look-ahead, so this is safe.
+                        */
+                       if (matches) {
+                               o->cache_bottom += matches;
+                               return mask;
+                       }
+               }
+
+               if (traverse_trees_parallel(n, dirmask, mask & ~dirmask, names, 
info) < 0)
+                       return -1;
+               return mask;
+       }
+
+       return mask;
+}
+
 static int unpack_callback(int n, unsigned long mask, unsigned long dirmask, 
struct name_entry *names, struct traverse_info *info)
 {
        struct cache_entry *src[MAX_UNPACK_TREES + 1] = { NULL, };
@@ -1263,6 +1458,116 @@ static void mark_new_skip_worktree(struct exclude_list 
*el,
 static int verify_absent(const struct cache_entry *,
                         enum unpack_trees_error_types,
                         struct unpack_trees_options *);
+
+#ifndef NO_PTHREADS
+static void *traverse_trees_parallel_thread_proc(void *_data)
+{
+       struct unpack_trees_options *o = _data;
+       struct traverse_info_parallel *info;
+       int i;
+
+       while (1) {
+               info = (struct traverse_info_parallel *)mpmcq_pop(&o->queue);
+               if (!info)
+                       break;
+
+               info->ret = traverse_trees(info->n, info->t, &info->info);
+               /*
+                * We can't play games with the cache bottom as we are 
processing
+                * the tree objects in parallel.
+                * restore_cache_bottom(&info->info, info->bottom);
+                */
+
+               for (i = 0; i < info->nr_buf; i++)
+                       free(info->buf[i]);
+               /*
+                * TODO: Can't free "info" when thread is done because it can 
be used
+                * as ->prev link in child info objects.  Ref count?  Free all 
at end?
+               free(info);
+                */
+
+               /* All I really need here is fetch_and_add() */
+               pthread_mutex_lock(&o->work_mutex);
+               o->remaining_work--;
+               if (o->remaining_work == 0)
+                       mpmcq_cancel(&o->queue);
+               pthread_mutex_unlock(&o->work_mutex);
+       }
+
+       return NULL;
+}
+
+static void init_parallel_traverse(struct unpack_trees_options *o,
+                                  struct traverse_info *info)
+{
+       /*
+        * TODO: Add logic to bypass parallel path when not needed.
+        *                      - not enough CPU cores to help
+        *                      - 'git status' is always fast - how to detect?
+        *                      - small trees (may be able to use index size as 
proxy, small index likely means small commit tree)
+        */
+       if (core_parallel_unpack_trees) {
+               int t;
+
+               mpmcq_init(&o->queue);
+               o->remaining_work = 0;
+               pthread_mutex_init(&o->unpack_nondirectories_mutex, NULL);
+               pthread_mutex_init(&o->unpack_index_entry_mutex, NULL);
+               pthread_mutex_init(&o->odb_mutex, NULL);
+               pthread_mutex_init(&o->work_mutex, NULL);
+               o->nr_threads = online_cpus();
+               o->pthreads = xcalloc(o->nr_threads, sizeof(pthread_t));
+               info->fn = unpack_callback_parallel;
+
+               for (t = 0; t < o->nr_threads; t++) {
+                       if (pthread_create(&o->pthreads[t], NULL,
+                                          traverse_trees_parallel_thread_proc,
+                                          o))
+                               die("unable to create 
traverse_trees_parallel_thread");
+               }
+       }
+}
+
+static void wait_parallel_traverse(struct unpack_trees_options *o)
+{
+       /*
+        * The first tree (root directory) is processed on the main thread.
+        * This function is called after it has completed.  If there is no
+        * remaining work, we know we are finished.
+        */
+       if (core_parallel_unpack_trees) {
+               int t;
+
+               pthread_mutex_lock(&o->work_mutex);
+               if (o->remaining_work == 0)
+                       mpmcq_cancel(&o->queue);
+               pthread_mutex_unlock(&o->work_mutex);
+
+               for (t = 0; t < o->nr_threads; t++) {
+                       if (pthread_join(o->pthreads[t], NULL))
+                               die("unable to join 
traverse_trees_parallel_thread");
+               }
+
+               free(o->pthreads);
+               pthread_mutex_destroy(&o->work_mutex);
+               pthread_mutex_destroy(&o->odb_mutex);
+               pthread_mutex_destroy(&o->unpack_index_entry_mutex);
+               pthread_mutex_destroy(&o->unpack_nondirectories_mutex);
+               mpmcq_destroy(&o->queue);
+       }
+}
+#else
+static void init_parallel_traverse(struct unpack_trees_options *o)
+{
+       return;
+}
+
+static void wait_parallel_traverse(struct unpack_trees_options *o)
+{
+       return;
+}
+#endif
+
 /*
  * N-way merge "len" trees.  Returns 0 on success, -1 on failure to manipulate 
the
  * resulting index, -2 on failure to reflect the changes to the work tree.
@@ -1327,6 +1632,7 @@ int unpack_trees(unsigned len, struct tree_desc *t, 
struct unpack_trees_options
                const char *prefix = o->prefix ? o->prefix : "";
                struct traverse_info info;
                uint64_t start;
+               int ret;
 
                setup_traverse_info(&info, prefix);
                info.fn = unpack_callback;
@@ -1352,9 +1658,12 @@ int unpack_trees(unsigned len, struct tree_desc *t, 
struct unpack_trees_options
                }
 
                start = getnanotime();
-               if (traverse_trees(len, t, &info) < 0)
-                       goto return_failed;
+               init_parallel_traverse(o, &info);
+               ret = traverse_trees(len, t, &info);
+               wait_parallel_traverse(o);
                trace_performance_since(start, "traverse_trees");
+               if (ret < 0)
+                       goto return_failed;
        }
 
        /* Any left-over entries in the index? */
diff --git a/unpack-trees.h b/unpack-trees.h
index c2b434c606..b7140099fa 100644
--- a/unpack-trees.h
+++ b/unpack-trees.h
@@ -3,6 +3,11 @@
 
 #include "tree-walk.h"
 #include "argv-array.h"
+#ifndef NO_PTHREADS
+#include "git-compat-util.h"
+#include <pthread.h>
+#include "mpmcqueue.h"
+#endif
 
 #define MAX_UNPACK_TREES 8
 
@@ -80,6 +85,31 @@ struct unpack_trees_options {
        struct index_state result;
 
        struct exclude_list *el; /* for internal use */
+#ifndef NO_PTHREADS
+       /*
+        * Speed up the tree traversal by adding all discovered tree objects
+        * into a queue and have a pool of worker threads process them in
+        * parallel.  Since there is no upper bound on the size of a tree and
+        * each worker thread will be adding discovered tree objects to the
+        * queue, we need an unbounded multi-producer-multi-consumer queue.
+        */
+       struct mpmcq queue;
+
+       int nr_threads;
+       pthread_t *pthreads;
+
+       /* need a mutex as we don't have fetch_and_add() */
+       int remaining_work;
+       pthread_mutex_t work_mutex;
+
+       /* The ODB is not thread safe so we must serialize access to it */
+       pthread_mutex_t odb_mutex;
+
+       /* various functions that are not thread safe and must be serialized 
for now */
+       pthread_mutex_t unpack_index_entry_mutex;
+       pthread_mutex_t unpack_nondirectories_mutex;
+
+#endif
 };
 
 extern int unpack_trees(unsigned n, struct tree_desc *t,
-- 
2.17.0.gvfs.1.123.g449c066

Reply via email to