Changes to v4: (diff below)
* Some functions wanted to be static (Thanks Ramsay!)
* The patch to factor out return code handling has been dropped as
  the return code handling is slightly different in finish_command and
  the parallel case.
* We can handle signals a bit more gracefully now.
* More documentation in run-command.h 
* I thought it is a good idea to introduce `sigchain_pop_common`.

Jonathan Nieder (1):
  submodule.c: write "Fetching submodule <foo>" to stderr

Stefan Beller (7):
  xread: poll on non blocking fds
  xread_nonblock: add functionality to read from fds without blocking
  strbuf: add strbuf_read_once to read without blocking
  sigchain: add command to pop all common signals
  run-command: add an asynchronous parallel child processor
  fetch_populated_submodules: use new parallel job processing
  submodules: allow parallel fetching, add tests and documentation

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 git-compat-util.h               |   1 +
 run-command.c                   | 348 ++++++++++++++++++++++++++++++++++++++++
 run-command.h                   |  63 ++++++++
 sigchain.c                      |   9 ++
 sigchain.h                      |   1 +
 strbuf.c                        |  11 ++
 strbuf.h                        |   9 ++
 submodule.c                     | 127 +++++++++++----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  70 +++++---
 test-run-command.c              |  24 +++
 wrapper.c                       |  35 +++-
 16 files changed, 675 insertions(+), 64 deletions(-)

diff --git a/run-command.c b/run-command.c
index 494e1f8..df84985 100644
--- a/run-command.c
+++ b/run-command.c
@@ -234,35 +234,6 @@ static inline void set_cloexec(int fd)
                fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
-static int determine_return_value(int wait_status,
-                                 int *result,
-                                 int *error_code,
-                                 const char *argv0)
-{
-       if (WIFSIGNALED(wait_status)) {
-               *result = WTERMSIG(wait_status);
-               if (*result != SIGINT && *result != SIGQUIT)
-                       error("%s died of signal %d", argv0, *result);
-               /*
-                * This return value is chosen so that code & 0xff
-                * mimics the exit code that a POSIX shell would report for
-                * a program that died from this signal.
-                */
-               *result += 128;
-       } else if (WIFEXITED(wait_status)) {
-               *result = WEXITSTATUS(wait_status);
-               /*
-                * Convert special exit code when execvp failed.
-                */
-               if (*result == 127) {
-                       *result = -1;
-                       *error_code = ENOENT;
-               }
-       } else
-               return -1;
-       return 0;
-}
-
 static int wait_or_whine(pid_t pid, const char *argv0)
 {
        int status, code = -1;
@@ -275,12 +246,29 @@ static int wait_or_whine(pid_t pid, const char *argv0)
        if (waiting < 0) {
                failed_errno = errno;
                error("waitpid for %s failed: %s", argv0, strerror(errno));
+       } else if (waiting != pid) {
+               error("waitpid is confused (%s)", argv0);
+       } else if (WIFSIGNALED(status)) {
+               code = WTERMSIG(status);
+               if (code != SIGINT && code != SIGQUIT)
+                       error("%s died of signal %d", argv0, code);
+               /*
+                * This return value is chosen so that code & 0xff
+                * mimics the exit code that a POSIX shell would report for
+                * a program that died from this signal.
+                */
+               code += 128;
+       } else if (WIFEXITED(status)) {
+               code = WEXITSTATUS(status);
+               /*
+                * Convert special exit code when execvp failed.
+                */
+               if (code == 127) {
+                       code = -1;
+                       failed_errno = ENOENT;
+               }
        } else {
-               if (waiting != pid || (determine_return_value(status,
-                                                             &code,
-                                                             &failed_errno,
-                                                             argv0) < 0))
-                       error("waitpid is confused (%s)", argv0);
+               error("waitpid is confused (%s)", argv0);
        }
 
        clear_child_for_cleanup(pid);
@@ -888,46 +876,67 @@ struct parallel_processes {
         */
        struct pollfd *pfd;
 
+       unsigned shutdown : 1;
+
        int output_owner;
        struct strbuf buffered_output; /* of finished children */
-};
+} parallel_processes_struct;
 
-void default_start_failure(void *data,
-                          struct child_process *cp,
-                          struct strbuf *err)
+static int default_start_failure(void *data,
+                                struct child_process *cp,
+                                struct strbuf *err)
 {
        int i;
-       struct strbuf sb = STRBUF_INIT;
 
+       strbuf_addstr(err, "Starting a child failed:");
        for (i = 0; cp->argv[i]; i++)
-               strbuf_addf(&sb, " %s", cp->argv[i]);
+               strbuf_addf(err, " %s", cp->argv[i]);
 
-       die_errno("Starting a child failed:%s", sb.buf);
+       return 0;
 }
 
-void default_return_value(void *data,
-                         struct child_process *cp,
-                         int result)
+static int default_return_value(void *data,
+                               struct child_process *cp,
+                               struct strbuf *err,
+                               int result)
 {
        int i;
-       struct strbuf sb = STRBUF_INIT;
 
        if (!result)
-               return;
+               return 0;
 
+       strbuf_addf(err, "A child failed with return code %d:", result);
        for (i = 0; cp->argv[i]; i++)
-               strbuf_addf(&sb, " %s", cp->argv[i]);
+               strbuf_addf(err, " %s", cp->argv[i]);
 
-       die_errno("A child failed with return code %d:%s", result, sb.buf);
+       return 0;
 }
 
-static void pp_init(struct parallel_processes *pp,
-                                       int n, void *data,
-                                       get_next_task_fn get_next_task,
-                                       start_failure_fn start_failure,
-                                       return_value_fn return_value)
+static void kill_children(struct parallel_processes *pp, int signo)
+{
+       int i, n = pp->max_processes;
+
+       for (i = 0; i < n; i++)
+               if (pp->children[i].in_use)
+                       kill(pp->children[i].process.pid, signo);
+}
+
+static void handle_children_on_signal(int signo)
+{
+       struct parallel_processes *pp = &parallel_processes_struct;
+
+       kill_children(pp, signo);
+       sigchain_pop(signo);
+       raise(signo);
+}
+
+static struct parallel_processes *pp_init(int n, void *data,
+                                         get_next_task_fn get_next_task,
+                                         start_failure_fn start_failure,
+                                         return_value_fn return_value)
 {
        int i;
+       struct parallel_processes *pp = &parallel_processes_struct;
 
        if (n < 1)
                n = online_cpus();
@@ -952,6 +961,8 @@ static void pp_init(struct parallel_processes *pp,
                pp->pfd[i].events = POLLIN;
                pp->pfd[i].fd = -1;
        }
+       sigchain_push_common(handle_children_on_signal);
+       return pp;
 }
 
 static void pp_cleanup(struct parallel_processes *pp)
@@ -964,6 +975,8 @@ static void pp_cleanup(struct parallel_processes *pp)
        free(pp->children);
        free(pp->pfd);
        strbuf_release(&pp->buffered_output);
+
+       sigchain_pop_common();
 }
 
 static void set_nonblocking(int fd)
@@ -977,7 +990,12 @@ static void set_nonblocking(int fd)
                        "output will be degraded");
 }
 
-/* returns 1 if a process was started, 0 otherwise */
+/* returns
+ *  0 if a new task was started.
+ *  1 if no new jobs was started (get_next_task ran out of work, non critical
+ *    problem with starting a new command)
+ * -1 no new job was started, user wishes to shutdown early.
+ */
 static int pp_start_one(struct parallel_processes *pp)
 {
        int i;
@@ -993,10 +1011,14 @@ static int pp_start_one(struct parallel_processes *pp)
                               &pp->children[i].err))
                return 1;
 
-       if (start_command(&pp->children[i].process))
-               pp->start_failure(pp->data,
-                                 &pp->children[i].process,
-                                 &pp->children[i].err);
+       if (start_command(&pp->children[i].process)) {
+               int code = pp->start_failure(pp->data,
+                                            &pp->children[i].process,
+                                            &pp->children[i].err);
+               strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+               strbuf_reset(&pp->children[i].err);
+               return code ? -1 : 1;
+       }
 
        set_nonblocking(pp->children[i].process.err);
 
@@ -1006,11 +1028,11 @@ static int pp_start_one(struct parallel_processes *pp)
        return 0;
 }
 
-static void pp_buffer_stderr(struct parallel_processes *pp)
+static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
 {
        int i;
 
-       while ((i = poll(pp->pfd, pp->max_processes, 100)) < 0) {
+       while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
                if (errno == EINTR)
                        continue;
                pp_cleanup(pp);
@@ -1038,17 +1060,18 @@ static void pp_output(struct parallel_processes *pp)
        }
 }
 
-static void pp_collect_finished(struct parallel_processes *pp)
+static int pp_collect_finished(struct parallel_processes *pp)
 {
        int i = 0;
        pid_t pid;
        int wait_status, code;
        int n = pp->max_processes;
+       int result = 0;
 
        while (pp->nr_processes > 0) {
                pid = waitpid(-1, &wait_status, WNOHANG);
                if (pid == 0)
-                       return;
+                       return 0;
 
                if (pid < 0)
                        die_errno("wait");
@@ -1064,12 +1087,38 @@ static void pp_collect_finished(struct 
parallel_processes *pp)
                                pp->children[i].process.err, 0) < 0)
                        die_errno("strbuf_read");
 
-               if (determine_return_value(wait_status, &code, &errno,
-                                          pp->children[i].process.argv[0]) < 0)
-                       error("waitpid is confused (%s)",
-                             pp->children[i].process.argv[0]);
+               if (WIFSIGNALED(wait_status)) {
+                       code = WTERMSIG(wait_status);
+                       if (!pp->shutdown &&
+                           code != SIGINT && code != SIGQUIT)
+                               strbuf_addf(&pp->children[i].err,
+                                           "%s died of signal %d",
+                                           pp->children[i].process.argv[0],
+                                           code);
+                       /*
+                        * This return value is chosen so that code & 0xff
+                        * mimics the exit code that a POSIX shell would report 
for
+                        * a program that died from this signal.
+                        */
+                       code += 128;
+               } else if (WIFEXITED(wait_status)) {
+                       code = WEXITSTATUS(wait_status);
+                       /*
+                        * Convert special exit code when execvp failed.
+                        */
+                       if (code == 127) {
+                               code = -1;
+                               errno = ENOENT;
+                       }
+               } else
+                       strbuf_addf(&pp->children[i].err,
+                                   "waitpid is confused (%s)",
+                                   pp->children[i].process.argv[0]);
+
 
-               pp->return_value(pp->data, &pp->children[i].process, code);
+               if (pp->return_value(pp->data, &pp->children[i].process,
+                                    &pp->children[i].err, code))
+                       result = 1;
 
                argv_array_clear(&pp->children[i].process.args);
                argv_array_clear(&pp->children[i].process.env_array);
@@ -1103,6 +1152,7 @@ static void pp_collect_finished(struct parallel_processes 
*pp)
                        pp->output_owner = (pp->output_owner + i) % n;
                }
        }
+       return result;
 }
 
 int run_processes_parallel(int n, void *data,
@@ -1110,21 +1160,43 @@ int run_processes_parallel(int n, void *data,
                           start_failure_fn start_failure,
                           return_value_fn return_value)
 {
-       struct parallel_processes pp;
-       pp_init(&pp, n, data, get_next_task, start_failure, return_value);
+       int no_more_task = 0;
+       struct parallel_processes *pp;
 
+       pp = pp_init(n, data, get_next_task, start_failure, return_value);
        while (1) {
-               while (pp.nr_processes < pp.max_processes &&
-                      !pp_start_one(&pp))
-                       ; /* nothing */
-               if (!pp.nr_processes)
+               int i;
+               int output_timeout = 100;
+               int spawn_cap = 4;
+
+               if (!no_more_task) {
+                       for (i = 0; i < spawn_cap; i++) {
+                               int code;
+                               if (pp->nr_processes == pp->max_processes)
+                                       break;
+
+                               code = pp_start_one(pp);
+                               if (!code)
+                                       continue;
+                               if (code < 0) {
+                                       pp->shutdown = 1;
+                                       kill_children(pp, SIGTERM);
+                               }
+                               no_more_task = 1;
+                               break;
+                       }
+               }
+               if (no_more_task && !pp->nr_processes)
                        break;
-               pp_buffer_stderr(&pp);
-               pp_output(&pp);
-               pp_collect_finished(&pp);
+               pp_buffer_stderr(pp, output_timeout);
+               pp_output(pp);
+               if (pp_collect_finished(pp)) {
+                       kill_children(pp, SIGTERM);
+                       pp->shutdown = 1;
+                       no_more_task = 1;
+               }
        }
 
-       pp_cleanup(&pp);
-
+       pp_cleanup(pp);
        return 0;
 }
diff --git a/run-command.h b/run-command.h
index 3807fd1..1179cb0 100644
--- a/run-command.h
+++ b/run-command.h
@@ -132,13 +132,36 @@ typedef int (*get_next_task_fn)(void *data,
                                struct child_process *cp,
                                struct strbuf *err);
 
-typedef void (*start_failure_fn)(void *data,
-                                struct child_process *cp,
-                                struct strbuf *err);
-
-typedef void (*return_value_fn)(void *data,
+/**
+ * This callback is called whenever there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*start_failure_fn)(void *data,
                                struct child_process *cp,
-                               int result);
+                               struct strbuf *err);
+
+/**
+ * This callback is called on every there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*return_value_fn)(void *data,
+                              struct child_process *cp,
+                              struct strbuf *err,
+                              int result);
 
 /**
  * Runs up to n processes at the same time. Whenever a process can be
@@ -148,6 +171,10 @@ typedef void (*return_value_fn)(void *data,
  * The children started via this function run in parallel and their output
  * to stderr is buffered, while one of the children will directly output
  * to stderr.
+ *
+ * If start_failure_fn and return_value_fn are NULL, default handlers
+ * will be used. The default handlers will print an error message on
+ * error without issuing an emergency stop.
  */
 
 int run_processes_parallel(int n, void *data,
diff --git a/sigchain.c b/sigchain.c
index faa375d..9262307 100644
--- a/sigchain.c
+++ b/sigchain.c
@@ -50,3 +50,12 @@ void sigchain_push_common(sigchain_fun f)
        sigchain_push(SIGQUIT, f);
        sigchain_push(SIGPIPE, f);
 }
+
+void sigchain_pop_common(void)
+{
+       sigchain_pop(SIGINT);
+       sigchain_pop(SIGHUP);
+       sigchain_pop(SIGTERM);
+       sigchain_pop(SIGQUIT);
+       sigchain_pop(SIGPIPE);
+}
diff --git a/sigchain.h b/sigchain.h
index 618083b..138b20f 100644
--- a/sigchain.h
+++ b/sigchain.h
@@ -7,5 +7,6 @@ int sigchain_push(int sig, sigchain_fun f);
 int sigchain_pop(int sig);
 
 void sigchain_push_common(sigchain_fun f);
+void sigchain_pop_common(void);
 
 #endif /* SIGCHAIN_H */
diff --git a/submodule.c b/submodule.c
index fdaf3e4..7ab89f4 100644
--- a/submodule.c
+++ b/submodule.c
@@ -630,18 +630,25 @@ struct submodule_parallel_fetch {
 int get_next_submodule(void *data, struct child_process *cp,
                       struct strbuf *err);
 
-void handle_submodule_fetch_start_err(void *data, struct child_process *cp, 
struct strbuf *err)
+static int fetch_start_failure(void *data, struct child_process *cp,
+                              struct strbuf *err)
 {
        struct submodule_parallel_fetch *spf = data;
+
        spf->result = 1;
+
+       return 0;
 }
 
-void handle_submodule_fetch_finish( void *data, struct child_process *cp, int 
retvalue)
+static int fetch_finish(void *data, struct child_process *cp,
+                       struct strbuf *err, int retvalue)
 {
        struct submodule_parallel_fetch *spf = data;
 
        if (retvalue)
                spf->result = 1;
+
+       return 0;
 }
 
 int fetch_populated_submodules(const struct argv_array *options,
@@ -671,8 +678,8 @@ int fetch_populated_submodules(const struct argv_array 
*options,
        calculate_changed_submodule_paths();
        run_processes_parallel(max_parallel_jobs, &spf,
                               get_next_submodule,
-                              handle_submodule_fetch_start_err,
-                              handle_submodule_fetch_finish);
+                              fetch_start_failure,
+                              fetch_finish);
 
        argv_array_clear(&spf.args);
 out:
diff --git a/test-run-command.c b/test-run-command.c
index 94c6eee..2555791 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -16,9 +16,9 @@
 #include <errno.h>
 
 static int number_callbacks;
-int parallel_next(void *data,
-                 struct child_process *cp,
-                 struct strbuf *err)
+static int parallel_next(void *data,
+                        struct child_process *cp,
+                        struct strbuf *err)
 {
        struct child_process *d = data;
        if (number_callbacks >= 4)


-- 
2.5.0.273.g6fa2560.dirty

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to