Hi,

I had an archive of ~35GiB which decompressed into ~80GiB and it took
almost 20 minutes to do so. Then I was thinking if it would be possible
to decompress it in parallel by feeding the individual blocks to the
available CPUs.

The patch at the bottom is a small C proof of concept so it is possible.
I managed to decompress the same file in slightly over two minutes on
system with 16 CPUs.

Could this feature be merged in an improved way into the `xz' binary?
Here are a few things I don't like
- The tool forks `xz -lv' to get the list of blocks. I didn't find an
  API to get this information.

- To decompress an individual block I create a new lzma_stream, feed the
  first few bytes from the image so it knows what it is and then feed
  the block. Once the block is done lzma_end() the stream and start
  over. It would be nice to create one stream for each CPU and then just
  reset the date after each block and reuse as much as possible of
  currently allocated memory.

--- /dev/null
+++ punxz.c
@@ -0,0 +1,460 @@
+/* PoC of parallel unxz, GPLv2, Sebastian A. Siewior */
+#define _GNU_SOURCE
+#include <errno.h>
+#include <fcntl.h>
+#include <lzma.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+struct item {
+       off_t pos_comp;
+       off_t pos_decomp;
+       ssize_t len_comp;
+       ssize_t len_decomp;
+};
+
+static int fd_out = -1;
+static pthread_mutex_t item_lock;
+static void *comp_mem;
+static int total_items;
+static int current_item;
+static struct item *items;
+static unsigned long total_comp;
+static unsigned long total_uncomp;
+static unsigned long read_comp;
+static unsigned long written_uncomp;
+
+static void create_items(char *filename)
+{
+       FILE *in;
+       int ret;
+       char output[512];
+       char *cmd;
+       long num_stream, num_block, num_block2, off_comp, offs_decomp;
+       int i;
+
+       ret = asprintf(&cmd, "xz --robot -lv %s", filename);
+       if (ret < 0) {
+               printf("asprintf\n");
+               exit(1);
+       }
+
+       in = popen(cmd, "r");
+       if (!in) {
+               printf("popen: %m\n");
+               exit(1);
+       }
+       free(cmd);
+       if (!fgets(output, sizeof(output), in)) {
+               printf("fgets() %m\n");
+               exit(1);
+       }
+
+       ret = strncmp(output, "name\t", 5);
+       if (ret) {
+               printf("-> %s\n", output);
+               exit(1);
+       }
+
+       if (!fgets(output, sizeof(output), in)) {
+               printf("fgets() %m\n");
+               exit(1);
+       }
+       ret = sscanf(output, "file\t%ld\t%ld\t",
+                    &num_stream, &num_block);
+       if (ret != 2) {
+               printf("'file' match failed\n");
+               exit(1);
+       }
+       if (num_stream != 1) {
+               printf("Only one stream\n");
+               exit(1);
+       }
+       if (!fgets(output, sizeof(output), in)) {
+               printf("fgets() %m\n");
+               exit(1);
+       }
+       ret = sscanf(output, "stream\t%ld\t%ld\t%ld\t%ld\t%ld\t%ld",
+                    &num_stream, &num_block2, &off_comp, &offs_decomp,
+                    &total_comp, &total_uncomp);
+       if (ret != 6 || num_stream != 1 || num_block != num_block2) {
+               printf("Unexpected block/stream value\n");
+               exit(1);
+       }
+       if (off_comp != 0 || offs_decomp != 0) {
+               printf("Unexpected comp/uncomp offset\n");
+               exit(1);
+       }
+       if (!total_comp || !total_uncomp) {
+               printf("Total comp/uncomp size is zero\n");
+               exit(1);
+       }
+
+       items = malloc(sizeof(struct item) * num_block);
+       total_items = num_block;
+       current_item = 0;
+       if (!items) {
+               printf("malloc()\n");
+               exit(1);
+       }
+
+       for (i = 0; i < num_block; i++) {
+               long cur_block, comp_off, uncomp_off, comp_size, uncomp_size;
+
+               if (!fgets(output, sizeof(output), in)) {
+                       printf("fgets() %m\n");
+                       exit(1);
+               }
+               ret = sscanf(output, 
"block\t%*d\t%*d\t%ld\t%ld\t%ld\t%ld\t%ld\t",
+                            &cur_block,  &comp_off, &uncomp_off, &comp_size, 
&uncomp_size);
+               if (ret != 5) {
+                       printf("Parsing failed\n");
+                       exit(1);
+               }
+               if (i != cur_block -1) {
+                       printf("Block num missmatch\n");
+                       exit(1);
+               }
+               items[i].pos_comp = comp_off;
+               items[i].pos_decomp = uncomp_off;
+               items[i].len_comp = comp_size;
+               items[i].len_decomp = uncomp_size;
+       }
+       do {
+               if (!fgets(output, sizeof(output), in))
+                       break;
+       } while (1);
+       pclose(in);
+}
+
+static void *decomp_one_block(struct item *item)
+{
+       lzma_stream strm = LZMA_STREAM_INIT;
+       lzma_ret lr;
+       char buf[512 * 1024];
+       off_t written = 0;
+
+       lr = lzma_stream_decoder(&strm, 1 << 30, LZMA_TELL_UNSUPPORTED_CHECK);
+       if (lr != LZMA_OK)
+               printf("Not okay\n");
+
+       strm.next_in = comp_mem;
+       strm.avail_in = items[0].pos_comp;
+
+       lr = lzma_code(&strm, LZMA_RUN);
+       if (lr != LZMA_OK) {
+               printf("Error #1\n");
+               exit(1);
+       }
+
+       strm.next_in = comp_mem + item->pos_comp;
+       strm.avail_in = item->len_comp;
+
+       do {
+               off_t itw;
+               ssize_t w;
+
+               strm.next_out = (void *)buf;
+               strm.avail_out = sizeof(buf);
+
+               lr = lzma_code(&strm, LZMA_RUN);
+               if (lr != LZMA_OK) {
+                       printf("Failed\n");
+                       exit(1);
+               }
+               itw = sizeof(buf) - strm.avail_out;
+
+               if (fd_out >= 0) {
+                       w = pwrite(fd_out, buf, itw, item->pos_decomp + 
written);
+                       if (w != itw) {
+                               printf("\n\nFailed to write: %m\n");
+                               exit(1);
+                       }
+               }
+
+               written += itw;
+
+               if (written == item->len_decomp)
+                       break;
+       } while (1);
+       lzma_end(&strm);
+       return NULL;
+}
+
+static void *decomp_thread(void *data)
+{
+       struct item item;
+
+       memset(&item, 0, sizeof(item));
+
+       do {
+               int ret;
+               unsigned long page_offs;
+               unsigned long page_len;
+               int inc = 0;
+
+               pthread_mutex_lock(&item_lock);
+               if (item.len_decomp) {
+
+                       written_uncomp += item.len_decomp;
+                       read_comp += item.len_comp;
+               }
+               if (current_item < total_items) {
+                       item = items[current_item];
+                       current_item++;
+               } else {
+                       pthread_mutex_unlock(&item_lock);
+                       return NULL;
+               }
+               pthread_mutex_unlock(&item_lock);
+
+               decomp_one_block(&item);
+               page_offs = (unsigned long)comp_mem + item.pos_comp;
+               if (page_offs & (4096 - 1))
+                       inc = 1;
+
+               page_offs &= ~(4096 - 1);
+               if (inc)
+                       page_offs += 4096;
+
+               page_len = item.len_comp;
+               page_len &= ~(4096 - 1);
+               ret = madvise((void *)page_offs, page_len, MADV_DONTNEED);
+               if (ret)
+                       printf("madvise(): %lx %lx\n", page_offs, page_len);
+       } while (1);
+
+       return NULL;
+}
+
+static char *conv_unit(unsigned long *val)
+{
+       unsigned long v = *val;
+       char *units[] = { "", "KiB", "MiB", "GiB", NULL };
+       int i = 0;
+
+       do {
+               if (v < 10 * 1024)
+                       break;
+               v /= 1024;
+               i++;
+       } while (units[i]);
+       *val = v;
+       return units[i];
+}
+
+static unsigned long last_written;
+
+static void stats_sig(int signum)
+{
+       double mib = 0;
+       unsigned long local_ucomp;
+       unsigned long diff;
+       unsigned long c_written_uncomp, c_read_comp, c_total_uncomp, 
c_total_comp;
+       char *u_written_uncomp, *u_read_comp, *u_total_uncomp, *u_total_comp;
+
+       local_ucomp = written_uncomp;
+
+       if (!last_written) {
+               last_written = local_ucomp;
+       } else {
+               diff = local_ucomp - last_written;
+               mib = diff;
+               mib /= (1024 * 1024);
+               last_written = local_ucomp;
+       }
+
+       c_written_uncomp = written_uncomp;
+       c_read_comp = read_comp;
+       c_total_uncomp = total_uncomp;
+       c_total_comp = total_comp;
+
+       u_written_uncomp = conv_unit(&c_written_uncomp);
+       u_read_comp = conv_unit(&c_read_comp);
+       u_total_uncomp = conv_unit(&c_total_uncomp);
+       u_total_comp = conv_unit(&c_total_comp);
+
+       fprintf(stderr, "\r%ld%s (%ld%s) of %ld%s (%ld%s) - %3.2f%% %5.2f 
MiB/sec %20s",
+               c_written_uncomp, u_written_uncomp,
+               c_read_comp, u_read_comp,
+               c_total_uncomp, u_total_uncomp,
+               c_total_comp, u_total_comp,
+               100.0 / total_uncomp * written_uncomp,
+               mib, "");
+}
+
+static void set_progress_signal(void)
+{
+       struct sigaction sa;
+       struct itimerval itv;
+
+       memset(&sa, 0, sizeof(sa));
+       sigemptyset(&sa.sa_mask);
+       sa.sa_handler = stats_sig;
+       sa.sa_flags = SA_RESTART;
+       sigaction(SIGALRM, &sa, NULL);
+
+       itv.it_interval.tv_sec = 1;
+       itv.it_interval.tv_usec = 0;
+       itv.it_value = itv.it_interval;
+       setitimer(ITIMER_REAL, &itv, NULL);
+}
+
+static void clr_progress_signal(void)
+{
+       sigaction(SIGALRM, NULL, NULL);
+       stats_sig(0);
+}
+
+#define USEC_PER_SEC   1000000
+static inline int64_t calcdiff(struct timespec t1, struct timespec t2)
+{
+       int64_t diff = USEC_PER_SEC * (long long)((int) t1.tv_sec - (int) 
t2.tv_sec);
+       diff += ((int) t1.tv_nsec - (int) t2.tv_nsec) / 1000;
+       return diff;
+}
+
+static int get_cpu_num(void)
+{
+       cpu_set_t cpu_mask;
+       int ret = 1;
+
+       if (!sched_getaffinity(0, sizeof(cpu_mask), &cpu_mask))
+               ret = CPU_COUNT(&cpu_mask);
+       return ret;
+}
+
+static void usage(char *s)
+{
+       printf("Usage: %s [-t num] [-o name] filename.xz\n", s);
+       printf("-t num\t Number of threads to use.\n");
+       printf("-o name\t Filename to write the output. Default none / test 
mode\n");
+       exit(1);
+}
+
+int main(int argc, char *argv[])
+{
+       pthread_mutexattr_t attr;
+       struct stat sbuf;
+       struct timespec ts_begin;
+       struct timespec ts_end;
+       char *fname_in = NULL;
+       char *fname_out = NULL;
+       pthread_t *threads;
+       int num_threads = 0;
+       int ret;
+       int fd;
+       int i;
+       int64_t usec_diff;
+       int opt;
+
+       do {
+               opt = getopt(argc, argv, "t:o:");
+               if (opt < 0)
+                       break;
+               switch (opt) {
+               case 't':
+                       num_threads = atoi(optarg);
+                       if (num_threads < 0)
+                               num_threads = 0;
+                       break;
+
+               case 'o':
+                       fname_out = optarg;
+                       break;
+
+               default:
+                       usage(argv[0]);
+               }
+       } while (1);
+
+       if (argc == optind + 1)
+               fname_in = argv[optind];
+       else
+               usage(argv[0]);
+
+       fd = open(fname_in, O_RDONLY);
+       if (fd < 0) {
+               printf("Can't open %s\n", fname_in);
+               return 1;
+       }
+       ret = fstat(fd, &sbuf);
+       if (ret < 0) {
+               printf("stat: %m\n");
+               return 1;
+       }
+       comp_mem = mmap(NULL, sbuf.st_size, PROT_READ, MAP_SHARED, fd, 0);
+       if (comp_mem == MAP_FAILED) {
+               printf("mmap: %m\n");
+               return 1;
+       }
+       close(fd);
+
+       create_items(fname_in);
+
+       pthread_mutexattr_init(&attr);
+       pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT);
+       pthread_mutex_init(&item_lock, &attr);
+
+       if (!num_threads)
+               num_threads = get_cpu_num();
+
+       threads = malloc(num_threads * sizeof(pthread_t));
+       if (!threads) {
+               printf("malloc()\n");
+               exit(1);
+       }
+
+       if (fname_out) {
+               fd_out = open(fname_out, O_WRONLY | O_CREAT | O_EXCL, 0644);
+               if (fd_out < 0) {
+                       printf("Can't create new file %s: %m\n", fname_out);
+                       exit(1);
+               }
+       }
+
+       set_progress_signal();
+
+       ret = clock_gettime(CLOCK_MONOTONIC, &ts_begin);
+       if (ret) {
+               printf("clock_gettime() failed %m\n");
+               exit(1);
+       }
+       for (i = 0; i < num_threads; i++) {
+               ret = pthread_create(&threads[i], NULL, decomp_thread, NULL);
+               if (ret != 0) {
+                       printf("pthread_create() %m\n");
+                       exit(1);
+               }
+       }
+       for (i = 0; i < num_threads; i++) {
+               ret = pthread_join(threads[i], NULL);
+               if (ret != 0) {
+                       printf("pthread_join() %m\n");
+                       exit(1);
+               }
+       }
+       ret = clock_gettime(CLOCK_MONOTONIC, &ts_end);
+       if (ret)
+               printf("clock_gettime() failed %m\n");
+       clr_progress_signal();
+       free(threads);
+       free(items);
+       munmap(comp_mem, sbuf.st_size);
+       if (fd_out >= 0)
+               close(fd_out);
+       usec_diff = calcdiff(ts_end, ts_begin);
+
+       printf("\nOverall write performance: %.3f MiB/sec (%ld bytes in %ld 
sec).\n",
+              (double) written_uncomp / usec_diff * 1000 * 1000 / 1024 / 1024,
+              written_uncomp, usec_diff / 1000 / 1000);
+       return 0;
+}

Sebastian

Reply via email to