From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> Changes since last post: - Options are considered - Memory limits are considered. The limit may get exceeded if we get close to it and then the existing threads enlarge their in-buffer. - Blocks with no size information in the header can be decompressed. This happens synchronous.
Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> --- src/liblzma/api/lzma/container.h | 5 +- src/liblzma/common/Makefile.inc | 6 + src/liblzma/common/stream_decoder_mt.c | 1058 ++++++++++++++++++++++++ src/liblzma/liblzma.map | 1 + src/xz/coder.c | 15 +- 5 files changed, 1080 insertions(+), 5 deletions(-) create mode 100644 src/liblzma/common/stream_decoder_mt.c diff --git a/src/liblzma/api/lzma/container.h b/src/liblzma/api/lzma/container.h index 9fbf4df06178e..de0a77b5d6482 100644 --- a/src/liblzma/api/lzma/container.h +++ b/src/liblzma/api/lzma/container.h @@ -173,7 +173,7 @@ typedef struct { uint32_t reserved_int2; uint32_t reserved_int3; uint32_t reserved_int4; - uint64_t reserved_int5; + uint64_t memlimit; uint64_t reserved_int6; uint64_t reserved_int7; uint64_t reserved_int8; @@ -630,3 +630,6 @@ extern LZMA_API(lzma_ret) lzma_stream_buffer_decode( const uint8_t *in, size_t *in_pos, size_t in_size, uint8_t *out, size_t *out_pos, size_t out_size) lzma_nothrow lzma_attr_warn_unused_result; + +extern LZMA_API(lzma_ret) + lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options); diff --git a/src/liblzma/common/Makefile.inc b/src/liblzma/common/Makefile.inc index 0408f9a48c4db..3c140c2955475 100644 --- a/src/liblzma/common/Makefile.inc +++ b/src/liblzma/common/Makefile.inc @@ -78,4 +78,10 @@ liblzma_la_SOURCES += \ common/stream_decoder.h \ common/stream_flags_decoder.c \ common/vli_decoder.c + +if COND_THREADS +liblzma_la_SOURCES += \ + common/stream_decoder_mt.c +endif + endif diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c new file mode 100644 index 0000000000000..b2f1c9ebfa607 --- /dev/null +++ b/src/liblzma/common/stream_decoder_mt.c @@ -0,0 +1,1058 @@ +/////////////////////////////////////////////////////////////////////////////// +// +/// \file stream_decoder_mt.c +/// \brief Multithreaded .xz Stream decoder +// +// Author: Sebastian Andrzej Siewior +// +// This file has been put into the public domain. +// You can do whatever you want with this file. +// +/////////////////////////////////////////////////////////////////////////////// + +#include "common.h" +#include "block_decoder.h" +#include "outqueue.h" +#include "stream_decoder.h" +#include "index.h" + +#include <stdio.h> + +typedef enum { + /// Waiting for work. + THR_IDLE, + + /// Decoding is in progress. + THR_RUN, + + /// The main thread wants the thread to stop whatever it was doing + /// but not exit. + THR_STOP, + + /// The main thread wants the thread to exit. + THR_EXIT, + +} worker_state; + +struct out_buffer { + uint8_t *out; + size_t out_block_size; /* Size of ->out */ + size_t out_pos; /* Bytes written to ->out (worker) */ + size_t out_filled; /* Bytes consumed of ->out (coordinator) */ +}; + +struct worker_thread { + worker_state state; + + uint8_t *in; + size_t in_size; /* Size of ->in */ + size_t in_block_size; /* Size of current block */ + size_t in_filled; /* Bytes written to ->in (coordinator) */ + size_t in_pos; /* Bytes consumed of ->in (worker) */ + + struct out_buffer out; + + /// Pointer to the main structure is needed when putting this + /// thread back to the stack of free threads. + struct lzma_stream_coder *coder; + + /* The allocator is set by the main thread. */ + const lzma_allocator *allocator; + /* Filter size is used for memusage accounting */ + size_t filter_size; + + lzma_next_coder block_decoder; + lzma_block block_options; + struct worker_thread *next; + + mythread_mutex mutex; + mythread_cond cond; + lzma_ret thread_error; + + mythread thread_id; +}; + +struct lzma_stream_coder { + enum { + SEQ_STREAM_HEADER, + SEQ_BLOCK_HEADER, + SEQ_BLOCK, + SEQ_INDEX, + SEQ_STREAM_FOOTER, + SEQ_STREAM_PADDING, + } sequence; + + + /// Memory usage limit + uint64_t memlimit; + /// Amount of memory actually needed (only an estimate) + uint64_t memusage; + size_t exp_filter_size; + size_t exp_block_size; + + lzma_index_hash *index_hash; + + mythread_mutex mutex; + mythread_cond cond; + + /// Array of allocated thread-specific structures + struct worker_thread *threads; + + /// Number of structures in "threads" above. This is also the + /// number of threads that will be created at maximum. + uint32_t threads_max; + + /// Number of thread structures that have been initialized, and + /// thus the number of worker threads actually created so far. + uint32_t threads_initialized; + + /// Stack of free threads. When a thread finishes, it puts itself + /// back into this stack. This starts as empty because threads + /// are created only when actually needed. + struct worker_thread *threads_free; + /* Current thread decompressed is read from */ + struct worker_thread *thr_read; + /* Last read thread, used for ->next assignment */ + struct worker_thread *thr_read_last; + /* Current thread compressed data is written to */ + struct worker_thread *thr_write; + + lzma_stream_flags stream_flags; + bool tell_no_check; + bool tell_unsupported_check; + bool tell_any_check; + bool ignore_check; + bool concatenated; + bool first_stream; + /* True if block sizes are missing and threads are not used */ + bool direct_decomp; + + size_t pos; + uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX]; +}; + +/* Use smaller chunks so cancalation attempts don't block for long */ +#define CHUNK_SIZE 16384 +static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) +{ + struct worker_thread *thr = thr_ptr; + size_t in_filled; + size_t out_pos; + lzma_ret ret; + struct out_buffer *out; + +next_loop_lock: + + mythread_mutex_lock(&thr->mutex); +next_loop_unlocked: + + if (thr->state == THR_IDLE) { + mythread_cond_wait(&thr->cond, &thr->mutex); + goto next_loop_unlocked; + } else if (thr->state == THR_EXIT) { + mythread_mutex_unlock(&thr->mutex); + + lzma_free(thr->in, thr->allocator); + lzma_free(thr->out.out, thr->allocator); + lzma_next_end(&thr->block_decoder, thr->allocator); + + mythread_mutex_destroy(&thr->mutex); + mythread_cond_destroy(&thr->cond); + return MYTHREAD_RET_VALUE; + + } else if (thr->state == THR_STOP) { + thr->state = THR_IDLE; + mythread_cond_wait(&thr->cond, &thr->mutex); + goto next_loop_unlocked; + } else if (thr->state != THR_RUN) { + thr->state = THR_IDLE; + thr->thread_error = LZMA_PROG_ERROR; + mythread_mutex_unlock(&thr->mutex); + + mythread_mutex_lock(&thr->coder->mutex); + mythread_cond_signal(&thr->coder->cond); + mythread_mutex_unlock(&thr->coder->mutex); + goto next_loop_lock; + } + + in_filled = thr->in_filled; + + if (in_filled == thr->in_pos) { + mythread_cond_wait(&thr->cond, &thr->mutex); + goto next_loop_unlocked; + } + out = &thr->out; + mythread_mutex_unlock(&thr->mutex); + + if ((in_filled - thr->in_pos) > CHUNK_SIZE) + in_filled = thr->in_pos + CHUNK_SIZE; + + out_pos = out->out_pos; + ret = thr->block_decoder.code(thr->block_decoder.coder, + thr->allocator, + thr->in, &thr->in_pos, in_filled, + out->out, &out_pos, out->out_block_size, + LZMA_RUN); + if (ret == LZMA_OK || ret == LZMA_STREAM_END) { + + if (thr->in_pos == thr->in_block_size) { + mythread_mutex_lock(&thr->mutex); + thr->state = THR_IDLE; + mythread_mutex_unlock(&thr->mutex); + } + + if (out_pos != out->out_pos) { + mythread_mutex_lock(&thr->coder->mutex); + out->out_pos = out_pos; + if (thr->coder->thr_read == thr) { + mythread_cond_signal(&thr->coder->cond); + } + mythread_mutex_unlock(&thr->coder->mutex); + } + goto next_loop_lock; + } else { + + mythread_mutex_lock(&thr->mutex); + thr->state = THR_IDLE; + thr->thread_error = ret; + mythread_mutex_unlock(&thr->mutex); + + mythread_mutex_lock(&thr->coder->mutex); + mythread_cond_signal(&thr->coder->cond); + mythread_mutex_unlock(&thr->coder->mutex); + goto next_loop_lock; + } + return MYTHREAD_RET_VALUE; +} + +static void threads_end(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + uint32_t i; + + for (i = 0; i < coder->threads_initialized; ++i) { + mythread_mutex_lock(&coder->threads[i].mutex); + coder->threads[i].state = THR_EXIT; + mythread_cond_signal(&coder->threads[i].cond); + mythread_mutex_unlock(&coder->threads[i].mutex); + } + + for (i = 0; i < coder->threads_initialized; ++i) + mythread_join(coder->threads[i].thread_id); + + coder->threads_initialized = 0; + lzma_free(coder->threads, allocator); + return; +} + +static void threads_stop(struct lzma_stream_coder *coder, bool wait_for_threads) +{ + uint32_t i; + + for (i = 0; i < coder->threads_initialized; ++i) { + mythread_mutex_lock(&coder->threads[i].mutex); + coder->threads[i].state = THR_STOP; + mythread_cond_signal(&coder->threads[i].cond); + mythread_mutex_unlock(&coder->threads[i].mutex); + } + + if (!wait_for_threads) + return; + + for (i = 0; i < coder->threads_initialized; ++i) { + mythread_mutex_lock(&coder->threads[i].mutex); + while (coder->threads[i].state != THR_IDLE) { + mythread_cond_wait(&coder->threads[i].cond, + &coder->threads[i].mutex); + } + mythread_mutex_unlock(&coder->threads[i].mutex); + } +} + +static void stream_decoder_mt_end(void *coder_ptr, + const lzma_allocator *allocator) +{ + struct lzma_stream_coder *coder = coder_ptr; + + threads_end(coder, allocator); + lzma_index_hash_end(coder->index_hash, allocator); + lzma_free(coder, allocator); +} + +static lzma_ret stream_decode_in(struct lzma_stream_coder *coder, + const uint8_t *restrict in, + size_t *restrict in_pos, + size_t in_size) +{ + struct worker_thread *thr = coder->thr_write; + size_t old_filled; + size_t cur_in_infilled; + lzma_ret ret = LZMA_OK; + + mythread_mutex_lock(&thr->mutex); + + if (thr->state == THR_IDLE) { + ret = thr->thread_error; + if (ret != LZMA_OK) { + mythread_mutex_unlock(&thr->mutex); + return ret; + } + } + + old_filled = thr->in_filled; + mythread_mutex_unlock(&thr->mutex); + cur_in_infilled = old_filled; + + lzma_bufcpy(in, in_pos, in_size, + thr->in, &cur_in_infilled, thr->in_block_size); + + mythread_mutex_lock(&thr->mutex); + thr->in_filled = cur_in_infilled; + + if (old_filled == thr->in_pos) + mythread_cond_signal(&thr->cond); + + mythread_mutex_unlock(&thr->mutex); + + /* complete in buffer consumed and out-buffer written */ + if (thr->in_filled == thr->in_block_size) { + + coder->sequence = SEQ_BLOCK_HEADER; + coder->thr_write = NULL; + return LZMA_OK; + } + + return ret; +} + +/// Initialize a new worker_thread structure and create a new thread. +static lzma_ret initialize_new_thread(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + struct worker_thread *thr = &coder->threads[coder->threads_initialized]; + + memset(thr, 0, sizeof(struct worker_thread)); + + if (mythread_mutex_init(&thr->mutex)) + goto error_mutex; + + if (mythread_cond_init(&thr->cond)) + goto error_cond; + + thr->state = THR_IDLE; + thr->thread_error = LZMA_OK; + thr->allocator = allocator; + thr->coder = coder; + thr->block_decoder = LZMA_NEXT_CODER_INIT; + + if (mythread_create(&thr->thread_id, worker_decoder, thr)) + goto error_thread; + + ++coder->threads_initialized; + coder->thr_write = thr; + + return LZMA_OK; + +error_thread: + mythread_cond_destroy(&thr->cond); + +error_cond: + mythread_mutex_destroy(&thr->mutex); + +error_mutex: + return LZMA_MEM_ERROR; +} + + +static lzma_ret get_thread(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + // If there is a free structure on the stack, use it. + if (coder->threads_free != NULL) { + coder->thr_write = coder->threads_free; + coder->threads_free = coder->threads_free->next; + } + + if (coder->thr_write == NULL) { + // If there are no uninitialized structures left, return. + if (coder->threads_initialized == coder->threads_max) + return LZMA_OK; + + if (coder->exp_filter_size && coder->exp_block_size) { + size_t exp; + + /* + * It is assumed that the archive consists of multiple + * blocks sharing the same filter and block settings. + * Therefore it is assumed the new thread will consume + * the same amount of filter memory and block-size for + * the out (decompressed) memory. For in (compressed) + * buffer it is assumed to consume block-size/2. The + * in-buffer will grow if needed so we may exceed the + * actual limit. + */ + exp = coder->exp_filter_size; + exp += coder->exp_block_size; + exp += coder->exp_block_size / 2; + + if (coder->memusage + exp > coder->memlimit) { + coder->threads_max = coder->threads_initialized; + return LZMA_OK; + } + } + + // Initialize a new thread. + return_if_error(initialize_new_thread(coder, allocator)); + } + + mythread_mutex_lock(&coder->thr_write->mutex); + coder->thr_write->next = NULL; + + coder->thr_write->in_block_size = 0; + coder->thr_write->in_filled = 0; + coder->thr_write->in_pos = 0; + + coder->thr_write->out.out_pos = 0; + coder->thr_write->out.out_filled = 0; + + memset(&coder->thr_write->block_options, 0, sizeof(lzma_block)); + coder->thr_write->state = THR_RUN; + mythread_mutex_unlock(&coder->thr_write->mutex); + + return LZMA_OK; +} + +static lzma_ret alloc_out_buffer(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + struct worker_thread *thr; + struct out_buffer *buf; + size_t uncomp_size; + + thr = coder->thr_write; + buf = &thr->out; + uncomp_size = thr->block_options.uncompressed_size; + + if (buf->out) { + if (buf->out_block_size == uncomp_size) + goto recycle_old; + + coder->memusage -= buf->out_block_size; + buf->out_block_size = 0; + lzma_free(buf->out, allocator); + } + buf->out = lzma_alloc(uncomp_size, allocator); + if (!buf->out) + return LZMA_MEM_ERROR; + coder->memusage += uncomp_size; + buf->out_block_size = uncomp_size; + if (coder->exp_block_size < uncomp_size) + coder->exp_block_size = uncomp_size; + +recycle_old: + buf->out_pos = 0; + buf->out_filled = 0; + + mythread_mutex_lock(&coder->mutex); + if (!coder->thr_read) { + coder->thr_read = thr; + coder->thr_read_last = thr; + } else { + coder->thr_read_last->next = thr; + coder->thr_read_last = thr; + } + mythread_mutex_unlock(&coder->mutex); + return LZMA_OK; +} + +static lzma_ret try_copy_decoded(struct lzma_stream_coder *coder, + uint8_t *restrict out, size_t *restrict out_pos, + size_t out_size) +{ + struct out_buffer *out_buff; + size_t out_buff_size; + + if (!coder->thr_read) + return LZMA_OK; + + out_buff = &coder->thr_read->out; + + do { + /* block fully consumed */ + if (out_buff->out_filled == out_buff->out_block_size) { + struct worker_thread *thr_new; + struct worker_thread *thr_old; + lzma_ret ret; + + ret = lzma_index_hash_append(coder->index_hash, + lzma_block_unpadded_size(&coder->thr_read->block_options), + coder->thr_read->block_options.uncompressed_size); + if (ret != LZMA_OK) + return ret; + + mythread_mutex_lock(&coder->mutex); + thr_old = coder->thr_read; + thr_new = thr_old->next; + thr_old->next = NULL; + + if (coder->threads_free) + thr_old->next = coder->threads_free; + coder->threads_free = thr_old; + coder->thr_read = thr_new; + + mythread_mutex_unlock(&coder->mutex); + + if (!thr_new) + return LZMA_OK; + + out_buff = &thr_new->out; + } + + /* whatever is done has been consumed */ + if (out_buff->out_pos == out_buff->out_filled) + return LZMA_OK; + + if (*out_pos == out_size) + return LZMA_OK; + + mythread_mutex_lock(&coder->mutex); + out_buff_size = out_buff->out_pos; + mythread_mutex_unlock(&coder->mutex); + + lzma_bufcpy(out_buff->out, &out_buff->out_filled, out_buff_size, + out, out_pos, out_size); + } while (1); +} + +static size_t comp_blk_size(struct lzma_stream_coder *coder, size_t size) +{ + return vli_ceil4(size) + lzma_check_size(coder->stream_flags.check); +} + +static lzma_ret +stream_decoder_reset(struct lzma_stream_coder *coder, const lzma_allocator *allocator) +{ + // Initialize the Index hash used to verify the Index. + coder->index_hash = lzma_index_hash_init(coder->index_hash, allocator); + if (coder->index_hash == NULL) + return LZMA_MEM_ERROR; + + // Reset the rest of the variables. + coder->sequence = SEQ_STREAM_HEADER; + coder->pos = 0; + + return LZMA_OK; +} + +static lzma_ret +stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, + const uint8_t *restrict in, size_t *restrict in_pos, + size_t in_size, + uint8_t *restrict out, size_t *restrict out_pos, + size_t out_size, lzma_action action) +{ + struct lzma_stream_coder *coder = coder_ptr; + struct worker_thread *thr; + lzma_ret ret; + size_t start_in_pos = *in_pos; + size_t start_out_pos = *out_pos; + + + while (true) + switch (coder->sequence) { + case SEQ_STREAM_HEADER: { + // Copy the Stream Header to the internal buffer. + lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, + LZMA_STREAM_HEADER_SIZE); + + // Return if we didn't get the whole Stream Header yet. + if (coder->pos < LZMA_STREAM_HEADER_SIZE) + return LZMA_OK; + + coder->pos = 0; + + // Decode the Stream Header. + ret = lzma_stream_header_decode(&coder->stream_flags, + coder->buffer); + if (ret != LZMA_OK) + return ret == LZMA_FORMAT_ERROR && !coder->first_stream + ? LZMA_DATA_ERROR : ret; + + // If we are decoding concatenated Streams, and the later + // Streams have invalid Header Magic Bytes, we give + // LZMA_DATA_ERROR instead of LZMA_FORMAT_ERROR. + coder->first_stream = false; + + // Even if we return LZMA_*_CHECK below, we want + // to continue from Block Header decoding. + coder->sequence = SEQ_BLOCK_HEADER; + + // Detect if there's no integrity check or if it is + // unsupported if those were requested by the application. + if (coder->tell_no_check && coder->stream_flags.check + == LZMA_CHECK_NONE) + return LZMA_NO_CHECK; + + if (coder->tell_unsupported_check + && !lzma_check_is_supported(coder->stream_flags.check)) + return LZMA_UNSUPPORTED_CHECK; + + if (coder->tell_any_check) + return LZMA_GET_CHECK; + break; + } + + case SEQ_BLOCK_HEADER: { + if (*in_pos >= in_size) + return LZMA_OK; + + thr = coder->thr_write; + if (!thr) { +seq_blk_hdr_again: + ret = try_copy_decoded(coder, out, out_pos, out_size); + if (ret != LZMA_OK) + return ret; + + ret = get_thread(coder, allocator); + if (ret != LZMA_OK) + return ret; + + if (!coder->thr_write) { + + /* No out buffer but making progress ? */ + if ((start_in_pos != *in_pos) || + (start_out_pos != *out_pos)) + return LZMA_OK; + + mythread_mutex_lock(&coder->mutex); + if (coder->thr_read->out.out_pos == coder->thr_read->out.out_filled) + mythread_cond_wait(&coder->cond, &coder->mutex); + + mythread_mutex_unlock(&coder->mutex); + } + thr = coder->thr_write; + if (!thr) + goto seq_blk_hdr_again; + } + + if (coder->pos == 0) { + // Detect if it's Index. + if (in[*in_pos] == 0x00) { + coder->sequence = SEQ_INDEX; + break; + } + + // Calculate the size of the Block Header. Note that + // Block Header decoder wants to see this byte too + // so don't advance *in_pos. + thr->block_options.header_size = + lzma_block_header_size_decode(in[*in_pos]); + } + + // Copy the Block Header to the internal buffer. + + lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, + thr->block_options.header_size); + + // Return if we didn't get the whole Block Header yet. + if (coder->pos < thr->block_options.header_size) + return LZMA_OK; + + coder->pos = 0; + + // Version 1 is needed to support the .ignore_check option. + thr->block_options.version = 1; + + // Set up a buffer to hold the filter chain. Block Header + // decoder will initialize all members of this array so + // we don't need to do it here. + lzma_filter filters[LZMA_FILTERS_MAX + 1]; + thr->block_options.filters = filters; + + // Copy the type of the Check so that Block Header and Block + // decoders see it. + thr->block_options.check = coder->stream_flags.check; + + // Decode the Block Header. + ret = lzma_block_header_decode(&thr->block_options, allocator, + coder->buffer); + if (ret != LZMA_OK) + return ret; + + // If LZMA_IGNORE_CHECK was used, this flag needs to be set. + // It has to be set after lzma_block_header_decode() because + // it always resets this to false. + thr->block_options.ignore_check = coder->ignore_check; + + if (thr->block_options.compressed_size == LZMA_VLI_UNKNOWN || + thr->block_options.uncompressed_size == LZMA_VLI_UNKNOWN) { + + /* + * Happens if the previous (first) block header has sizes + * encoded but one of the following block header does + * not. + */ + if (coder->threads_initialized != 1) + return LZMA_PROG_ERROR; + + coder->direct_decomp = true; + } else { + ret = alloc_out_buffer(coder, allocator); + if (ret != LZMA_OK) + return ret; + + thr->in_block_size = comp_blk_size(coder, thr->block_options.compressed_size); + + if (thr->in_size < thr->in_block_size) { + coder->memusage -= thr->in_size; + lzma_free(thr->in, allocator); + + thr->in = lzma_alloc(thr->in_block_size, allocator); + if (!thr->in) + return LZMA_MEM_ERROR; + thr->in_size = thr->in_block_size; + coder->memusage += thr->in_size; + } + } + + // Check the memory usage limit. + const uint64_t memusage = lzma_raw_decoder_memusage(filters); + + if (memusage == UINT64_MAX) { + // One or more unknown Filter IDs. + ret = LZMA_OPTIONS_ERROR; + } else { + if (coder->exp_filter_size < memusage) + coder->exp_filter_size = memusage; + + /* + * The coder->memusage contains the size of in+out + * buffer. Only for the first thread the check against + * the filter size is made. Later it is attempted not to + * create new threads if the memory limit is about to get + * exceeded. Since the `in' buffer will be enlarged if + * needed we may exceed the memory limit. Therefore there + * is no further check for memusage to not abort work in + * the middle. + */ + if (coder->threads_initialized == 1 && + coder->memusage + memusage > coder->memlimit) { + // The chain would need too much memory. + ret = LZMA_MEMLIMIT_ERROR; + } else { + // Memory usage is OK. + // Initialize the Block decoder. + ret = lzma_block_decoder_init( + &thr->block_decoder, + allocator, + &thr->block_options); + if (thr->filter_size != memusage) { + coder->memusage -= thr->filter_size; + coder->memusage += memusage; + thr->filter_size = memusage; + } + } + } + + // Free the allocated filter options since they are needed + // only to initialize the Block decoder. + for (size_t i = 0; i < LZMA_FILTERS_MAX; ++i) + lzma_free(filters[i].options, allocator); + thr->block_options.filters = NULL; + // Check if memory usage calculation and Block enocoder + // initialization succeeded. + if (ret != LZMA_OK) + return ret; + + coder->sequence = SEQ_BLOCK; + break; + + case SEQ_BLOCK: + thr = coder->thr_write; + + /* Direct decompression if we lack sizes in block header */ + if (coder->direct_decomp) { + ret = thr->block_decoder.code(thr->block_decoder.coder, + thr->allocator, + in, in_pos, in_size, + out, out_pos, out_size, + action); + if (ret != LZMA_STREAM_END) + return ret; + + // Block decoded successfully. Add the new size pair to + // the Index hash. + ret = lzma_index_hash_append(coder->index_hash, + lzma_block_unpadded_size(&thr->block_options), + thr->block_options.uncompressed_size); + if (ret != LZMA_OK) + return ret; + + coder->sequence = SEQ_BLOCK_HEADER; + break; + } + + ret = try_copy_decoded(coder, out, out_pos, out_size); + if (ret != LZMA_OK) + return ret; + + ret = stream_decode_in(coder, in, in_pos, in_size); + if (ret != LZMA_OK) { + threads_stop(coder, false); + return ret; + } + + if ((*in_pos >= in_size) || (*out_pos >= out_size)) + return LZMA_OK; + + break; + } + + case SEQ_INDEX: { + // If we don't have any input, don't call + // lzma_index_hash_decode() since it would return + // LZMA_BUF_ERROR, which we must not do here. + if (*in_pos >= in_size) + return LZMA_OK; + + /* first flush all worker threads, so the accounting of decoded + * blocks matches index's expectation. + */ + while (coder->thr_read) { + ret = try_copy_decoded(coder, out, out_pos, out_size); + if (ret != LZMA_OK) + return ret; + + if (*out_pos >= out_size) + return LZMA_OK; + + if (!coder->thr_read) + break; + + mythread_mutex_lock(&coder->mutex); + if (coder->thr_read->out.out_pos == coder->thr_read->out.out_filled) + mythread_cond_wait(&coder->cond, &coder->mutex); + mythread_mutex_unlock(&coder->mutex); + } + + // Decode the Index and compare it to the hash calculated + // from the sizes of the Blocks (if any). + ret = lzma_index_hash_decode(coder->index_hash, in, in_pos, + in_size); + if (ret != LZMA_STREAM_END) { + return ret; + } + + coder->sequence = SEQ_STREAM_FOOTER; + break; + } + + case SEQ_STREAM_FOOTER: + + lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, + LZMA_STREAM_HEADER_SIZE); + + // Return if we didn't get the whole Stream Footer yet. + if (coder->pos < LZMA_STREAM_HEADER_SIZE) + return LZMA_OK; + + coder->pos = 0; + // Decode the Stream Footer. The decoder gives + // LZMA_FORMAT_ERROR if the magic bytes don't match, + // so convert that return code to LZMA_DATA_ERROR. + lzma_stream_flags footer_flags; + ret = lzma_stream_footer_decode(&footer_flags, coder->buffer); + if (ret != LZMA_OK) + return ret == LZMA_FORMAT_ERROR + ? LZMA_DATA_ERROR : ret; + + // Check that Index Size stored in the Stream Footer matches + // the real size of the Index field. + if (lzma_index_hash_size(coder->index_hash) + != footer_flags.backward_size) + return LZMA_DATA_ERROR; + + // Compare that the Stream Flags fields are identical in + // both Stream Header and Stream Footer. + ret = lzma_stream_flags_compare(&coder->stream_flags, &footer_flags); + if (ret != LZMA_OK) + return ret; + + if (!coder->concatenated) + return LZMA_STREAM_END; + coder->sequence = SEQ_STREAM_PADDING; + break; + + case SEQ_STREAM_PADDING: { + + // Skip over possible Stream Padding. + while (true) { + if (*in_pos >= in_size) { + // Unless LZMA_FINISH was used, we cannot + // know if there's more input coming later. + if (action != LZMA_FINISH) { + return LZMA_OK; + } + + // Stream Padding must be a multiple of + // four bytes. + return coder->pos == 0 + ? LZMA_STREAM_END + : LZMA_DATA_ERROR; + } + + // If the byte is not zero, it probably indicates + // beginning of a new Stream (or the file is corrupt). + if (in[*in_pos] != 0x00) + break; + + ++*in_pos; + coder->pos = (coder->pos + 1) & 3; + } + + // Stream Padding must be a multiple of four bytes (empty + // Stream Padding is OK). + if (coder->pos != 0) { + ++*in_pos; + return LZMA_DATA_ERROR; + } + + // Prepare to decode the next Stream. + return_if_error(stream_decoder_reset(coder, allocator)); + break; + } + + default: + assert(0); + return LZMA_PROG_ERROR; + } + return LZMA_PROG_ERROR; +} + +static lzma_check stream_decoder_mt_get_check(const void *coder_ptr) +{ + const struct lzma_stream_coder *coder = coder_ptr; + return coder->stream_flags.check; +} + +static lzma_ret stream_decoder_mt_memconfig(void *coder_ptr, uint64_t *memusage, + uint64_t *old_memlimit, + uint64_t new_memlimit) +{ + struct lzma_stream_coder *coder = coder_ptr; + + *memusage = coder->memusage; + *old_memlimit = coder->memlimit; + + if (new_memlimit != 0) { + if (new_memlimit < coder->memusage) + return LZMA_MEMLIMIT_ERROR; + + coder->memlimit = new_memlimit; + } + + return LZMA_OK; +} + + +static lzma_ret +stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, + const lzma_mt *options) +{ + struct lzma_stream_coder *coder; + + if (options->threads == 0 || options->threads > LZMA_THREADS_MAX) + return LZMA_OPTIONS_ERROR; + if (options->flags & ~LZMA_SUPPORTED_FLAGS) + return LZMA_OPTIONS_ERROR; + + lzma_next_coder_init(&stream_decoder_mt_init, next, allocator); + + coder = next->coder; + if (!coder) { + coder = lzma_alloc(sizeof(struct lzma_stream_coder), allocator); + if (coder == NULL) + return LZMA_MEM_ERROR; + + memset(coder, 0xff, sizeof(struct lzma_stream_coder)); + + if (mythread_mutex_init(&coder->mutex)) + goto err_out; + + if (mythread_cond_init(&coder->cond)) { + mythread_mutex_destroy(&coder->mutex); + goto err_out; + } + + next->coder = coder; + + next->code = stream_decode_mt; + next->end = stream_decoder_mt_end; + next->get_check = stream_decoder_mt_get_check; + next->memconfig = &stream_decoder_mt_memconfig; + + next->get_progress = NULL; + + coder->index_hash = NULL; + coder->threads_max = 0; + coder->threads_initialized = 0; + } + + coder->sequence = SEQ_STREAM_HEADER; + + coder->memlimit = my_max(1, options->memlimit); + coder->memusage = LZMA_MEMUSAGE_BASE; + + coder->tell_no_check = options->flags & LZMA_TELL_NO_CHECK; + coder->tell_unsupported_check = options->flags & LZMA_TELL_UNSUPPORTED_CHECK; + coder->tell_any_check = options->flags & LZMA_TELL_ANY_CHECK; + coder->ignore_check = options->flags & LZMA_IGNORE_CHECK; + coder->concatenated = options->flags & LZMA_CONCATENATED; + coder->first_stream = true; + coder->direct_decomp = false; + coder->exp_filter_size = 0; + coder->exp_block_size = 0; + coder->pos = 0; + + memset(&coder->stream_flags, 0, sizeof(lzma_stream_flags)); + /* By allocating threads from scratch we can start memory-usage + * accouting from scratch, too. Changes in filter and block sizes may + * affect number of threads. We don't keep possible larger-than-needed + * in buffer (if the block size decreased) and have only one thread + * in case this stream has no block sizes (and `direct_decomp' expects + * no threads to keep it simple). + */ + if (coder->threads_max) { + coder->threads_max = 0; + threads_end(coder, allocator); + } + + coder->threads = lzma_alloc(options->threads * sizeof(struct worker_thread), + allocator); + if (coder->threads == NULL) + goto err_out; + coder->threads_free = NULL; + coder->thr_read = NULL; + coder->thr_read_last = NULL; + coder->thr_write = NULL; + + coder->threads_max = options->threads; + + return stream_decoder_reset(coder, allocator); + +err_out: + lzma_free(coder->threads, allocator); + lzma_free(coder, allocator); + return LZMA_MEM_ERROR; +} + +extern LZMA_API(lzma_ret) +lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options) +{ + lzma_next_strm_init(stream_decoder_mt_init, strm, options); + + strm->internal->supported_actions[LZMA_RUN] = true; + strm->internal->supported_actions[LZMA_FINISH] = true; + + return LZMA_OK; +} diff --git a/src/liblzma/liblzma.map b/src/liblzma/liblzma.map index bad8633c3b8d2..3f34d8c2814f0 100644 --- a/src/liblzma/liblzma.map +++ b/src/liblzma/liblzma.map @@ -107,6 +107,7 @@ XZ_5.2 { XZ_5.3.1alpha { global: lzma_file_info_decoder; + lzma_stream_decoder_mt; local: *; diff --git a/src/xz/coder.c b/src/xz/coder.c index 85f954393d8bf..c22bf136285c5 100644 --- a/src/xz/coder.c +++ b/src/xz/coder.c @@ -51,7 +51,7 @@ static lzma_check check; /// This becomes false if the --check=CHECK option is used. static bool check_default = true; -#if defined(HAVE_ENCODERS) && defined(MYTHREAD_ENABLED) +#if (defined(HAVE_ENCODERS) || defined(HAVE_DECODERS)) && defined(MYTHREAD_ENABLED) static lzma_mt mt_options = { .flags = 0, .timeout = 300, @@ -520,9 +520,16 @@ coder_init(file_pair *pair) break; case FORMAT_XZ: - ret = lzma_stream_decoder(&strm, - hardware_memlimit_get( - MODE_DECOMPRESS), flags); + if (hardware_threads_get() > 1) { + mt_options.threads = hardware_threads_get(); + mt_options.flags = flags; + mt_options.memlimit = hardware_memlimit_get(MODE_DECOMPRESS); + ret = lzma_stream_decoder_mt(&strm, &mt_options); + } else { + ret = lzma_stream_decoder(&strm, + hardware_memlimit_get( + MODE_DECOMPRESS), flags); + } break; case FORMAT_LZMA: -- 2.29.2