This patch adds support for LZMA_SYNC_FLUSH in the multi threaded
encoder. I don't think this patch is very high priority since this
action has never been supported in the multi threaded encoder, but it
was fun to work on since I had just done sync flush support in the
block encoder. I created a few tests for this which can be seen on my
GitHub at 
https://github.com/JiaT75/XZ_Utils_Unofficial/tree/support_LZMA_SYNC_FLUSH_mt_encoder.

The only drawback by adding this patch is that encoding can fail if
the user calls LZMA_SYNC_FLUSH too many times on the same block. This
will overfill the worker's buffer and result in LZMA_PROG_ERROR. I
added this warning to the documentation, so I think it is still worth
considering this change. If not, the TODOs for LZMA_SYNC_FLUSH should
be removed since if its not supported after this, it never will be.
Let me know what feedback you have for this and what can be improved.

Jia Tan
From 245aebfb4340a013aadc7e49651f7448b127ab65 Mon Sep 17 00:00:00 2001
From: jiat75 <jiat0...@gmail.com>
Date: Mon, 16 May 2022 20:57:19 +0800
Subject: [PATCH] stream_encoder_mt now supports LZMA_SYNC_FLUSH action

To accomplish this, I had to adjust lzma_outq_enable_partial_output
since doing a sync flush multiple times in a block would have to
toggle the partial output status. Also, I had to add
lzma_encode_data_uncomp to encode the body of uncompressed lzma2
chunk.
---
 src/liblzma/api/lzma/container.h          |   9 +-
 src/liblzma/common/block_buffer_encoder.c |  61 ++++--
 src/liblzma/common/block_buffer_encoder.h |   6 +
 src/liblzma/common/outqueue.c             |   6 +-
 src/liblzma/common/stream_encoder_mt.c    | 250 ++++++++++++++++++++--
 5 files changed, 283 insertions(+), 49 deletions(-)

diff --git a/src/liblzma/api/lzma/container.h b/src/liblzma/api/lzma/container.h
index 564c6aaf..6a27312f 100644
--- a/src/liblzma/api/lzma/container.h
+++ b/src/liblzma/api/lzma/container.h
@@ -388,8 +388,7 @@ extern LZMA_API(uint64_t) lzma_stream_encoder_mt_memusage(
  * lzma_stream_encoder() as a single function for multithreaded use.
  *
  * The supported actions for lzma_code() are LZMA_RUN, LZMA_FULL_FLUSH,
- * LZMA_FULL_BARRIER, and LZMA_FINISH. Support for LZMA_SYNC_FLUSH might be
- * added in the future.
+ * LZMA_FULL_BARRIER, LZMA_SYNC_FLUSH, and LZMA_FINISH.
  *
  * \param       strm    Pointer to properly prepared lzma_stream
  * \param       options Pointer to multithreaded compression options
@@ -399,6 +398,12 @@ extern LZMA_API(uint64_t) lzma_stream_encoder_mt_memusage(
  *              - LZMA_UNSUPPORTED_CHECK
  *              - LZMA_OPTIONS_ERROR
  *              - LZMA_PROG_ERROR
+ *
+ * \note        Calling lzma_code with LZMA_SYNC_FLUSH must be done with
+ *              caution. The overhead associated with LZMA_SYNC_FLUSH if
+ *              called too many times can fill up the worker threads
+ *              internal buffers and cause an error. Uncompressable data
+ *              is especially vulnerable to this.
  */
 extern LZMA_API(lzma_ret) lzma_stream_encoder_mt(
 		lzma_stream *strm, const lzma_mt *options)
diff --git a/src/liblzma/common/block_buffer_encoder.c b/src/liblzma/common/block_buffer_encoder.c
index 39e263aa..8219b0f9 100644
--- a/src/liblzma/common/block_buffer_encoder.c
+++ b/src/liblzma/common/block_buffer_encoder.c
@@ -85,6 +85,42 @@ lzma_block_buffer_bound(size_t uncompressed_size)
 }
 
 
+extern lzma_ret
+lzma_encode_data_uncomp(const uint8_t *in, size_t in_size, uint8_t *out,
+		size_t *out_pos, size_t out_size)
+{
+	// Encode the data using LZMA2 uncompressed chunks.
+	size_t in_pos = 0;
+	uint8_t control = 0x01; // Dictionary reset
+
+	// Sanity check
+	if (in_size > (out_size - *out_pos))
+		return LZMA_BUF_ERROR;
+
+	while (in_pos < in_size) {
+		// Control byte: Indicate uncompressed chunk, of which
+		// the first resets the dictionary.
+		out[(*out_pos)++] = control;
+		control = 0x02; // No dictionary reset
+
+		// Size of the uncompressed chunk
+		const size_t copy_size
+				= my_min(in_size - in_pos, LZMA2_CHUNK_MAX);
+		out[(*out_pos)++] = (copy_size - 1) >> 8;
+		out[(*out_pos)++] = (copy_size - 1) & 0xFF;
+
+		// The actual data
+		assert(*out_pos + copy_size <= out_size);
+		memcpy(out + *out_pos, in + in_pos, copy_size);
+
+		in_pos += copy_size;
+		*out_pos += copy_size;
+	}
+
+	return LZMA_OK;
+}
+
+
 static lzma_ret
 block_encode_uncompressed(lzma_block *block, const uint8_t *in, size_t in_size,
 		uint8_t *out, size_t *out_pos, size_t out_size)
@@ -131,29 +167,8 @@ block_encode_uncompressed(lzma_block *block, const uint8_t *in, size_t in_size,
 	block->filters = filters_orig;
 	*out_pos += block->header_size;
 
-	// Encode the data using LZMA2 uncompressed chunks.
-	size_t in_pos = 0;
-	uint8_t control = 0x01; // Dictionary reset
-
-	while (in_pos < in_size) {
-		// Control byte: Indicate uncompressed chunk, of which
-		// the first resets the dictionary.
-		out[(*out_pos)++] = control;
-		control = 0x02; // No dictionary reset
-
-		// Size of the uncompressed chunk
-		const size_t copy_size
-				= my_min(in_size - in_pos, LZMA2_CHUNK_MAX);
-		out[(*out_pos)++] = (copy_size - 1) >> 8;
-		out[(*out_pos)++] = (copy_size - 1) & 0xFF;
-
-		// The actual data
-		assert(*out_pos + copy_size <= out_size);
-		memcpy(out + *out_pos, in + in_pos, copy_size);
-
-		in_pos += copy_size;
-		*out_pos += copy_size;
-	}
+	return_if_error(lzma_encode_data_uncomp(in, in_size, out, out_pos,
+			out_size));
 
 	// End marker
 	out[(*out_pos)++] = 0x00;
diff --git a/src/liblzma/common/block_buffer_encoder.h b/src/liblzma/common/block_buffer_encoder.h
index 653207f7..20566546 100644
--- a/src/liblzma/common/block_buffer_encoder.h
+++ b/src/liblzma/common/block_buffer_encoder.h
@@ -21,4 +21,10 @@
 /// should have been 64-bit, but fixing it would break the ABI.
 extern uint64_t lzma_block_buffer_bound64(uint64_t uncompressed_size);
 
+/// Encodes block data as uncompressed lzma2 chunks
+/// Does not encode headers or end markers
+extern lzma_ret lzma_encode_data_uncomp(const uint8_t *in,
+ 		size_t in_size, uint8_t *out, size_t *out_pos,
+		size_t out_size);
+
 #endif
diff --git a/src/liblzma/common/outqueue.c b/src/liblzma/common/outqueue.c
index 71e8648a..bdaf013b 100644
--- a/src/liblzma/common/outqueue.c
+++ b/src/liblzma/common/outqueue.c
@@ -276,12 +276,8 @@ lzma_outq_enable_partial_output(lzma_outq *outq,
 		void (*enable_partial_output)(void *worker))
 {
 	if (outq->head != NULL && !outq->head->finished
-			&& outq->head->worker != NULL) {
+			&& outq->head->worker != NULL)
 		enable_partial_output(outq->head->worker);
 
-		// Set it to NULL since calling it twice is pointless.
-		outq->head->worker = NULL;
-	}
-
 	return;
 }
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 24addd40..9717b97e 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -16,6 +16,7 @@
 #include "block_buffer_encoder.h"
 #include "index_encoder.h"
 #include "outqueue.h"
+#include "check.h"
 
 
 /// Maximum supported block size. This makes it simpler to prevent integer
@@ -40,10 +41,26 @@ typedef enum {
 
 	/// The main thread wants the thread to exit. We could use
 	/// cancellation but since there's stopped anyway, this is lazier.
-	THR_EXIT,
-
+	THR_EXIT
 } worker_state;
 
+
+typedef enum {
+	/// Partial updates needed to make LZMA_SYNC_FLUSH work
+	/// are disabled.
+	SYNC_FLUSH_DISABLED,
+
+	/// Main thread requests partial updates to be enabled but
+	/// no partial update has been done by the worker thread yet.
+	SYNC_FLUSH_ENABLED,
+
+	/// The worker has written its progress out to its outbuf,
+	/// but the main thread has not yet read it
+	SYNC_FLUSH_DONE,
+
+} sync_flush_mode;
+
+
 typedef struct lzma_stream_coder_s lzma_stream_coder;
 
 typedef struct worker_thread_s worker_thread;
@@ -94,6 +111,8 @@ struct worker_thread_s {
 	/// The ID of this thread is used to join the thread
 	/// when it's not needed anymore.
 	mythread thread_id;
+
+	sync_flush_mode flush_mode;
 };
 
 
@@ -198,6 +217,20 @@ worker_error(worker_thread *thr, lzma_ret ret)
 }
 
 
+/// Enables updating of outbuf->pos to allow for LZMA_SYNC_FLUSH support.
+/// This is a callback function that is
+/// used with lzma_outq_enable_partial_output().
+/// This is different from the version used in stream_decoder_mt because
+/// this version does not need to lock because it is only ever called while
+/// holding the thread's mutex
+static void
+worker_enable_partial_update(void *thr_ptr)
+{
+	worker_thread *thr = thr_ptr;
+	thr->flush_mode = SYNC_FLUSH_ENABLED;
+}
+
+
 static worker_state
 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
 {
@@ -236,6 +269,15 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
 
 	size_t in_pos = 0;
 	size_t in_size = 0;
+	sync_flush_mode flush_mode;
+	// The partial in position and partial out position trackers
+	// are used to track multiple LZMA_SYNC_FLUSH calls in the middle
+	// of a block when the data cannot be compressed.
+	// If the data can be compressed, only the outbuf position pointer
+	// needs to be updated. The partial position pointers are still
+	// updated in case data further in the block cannot be compressed
+	size_t partial_in_pos = 0;
+	size_t partial_out_pos = 0;
 
 	*out_pos = thr->block_options.header_size;
 	const size_t out_size = thr->outbuf->allocated;
@@ -253,23 +295,28 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
 			thr->progress_out = *out_pos;
 
 			while (in_size == thr->in_size
-					&& thr->state == THR_RUN)
+				&& thr->state == THR_RUN
+				&& thr->flush_mode != SYNC_FLUSH_ENABLED)
 				mythread_cond_wait(&thr->cond, &thr->mutex);
 
 			state = thr->state;
 			in_size = thr->in_size;
+			flush_mode = thr->flush_mode;
 		}
 
 		// Return if we were asked to stop or exit.
 		if (state >= THR_STOP)
 			return state;
 
-		lzma_action action = state == THR_FINISH
-				? LZMA_FINISH : LZMA_RUN;
+		lzma_action action = LZMA_RUN;
+		if (state == THR_FINISH)
+			action = LZMA_FINISH;
+		else if (flush_mode == SYNC_FLUSH_ENABLED)
+			action = LZMA_SYNC_FLUSH;
 
 		// Limit the amount of input given to the Block encoder
 		// at once. This way this thread can react fairly quickly
-		// if the main thread wants us to stop or exit.
+		// if the main thread wants us to stop, exit, or flush.
 		static const size_t in_chunk_max = 16384;
 		size_t in_limit = in_size;
 		if (in_size - in_pos > in_chunk_max) {
@@ -281,12 +328,80 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
 				thr->block_encoder.coder, thr->allocator,
 				thr->in, &in_pos, in_limit, thr->outbuf->buf,
 				out_pos, out_size, action);
-	} while (ret == LZMA_OK && *out_pos < out_size);
+
+		// If the sync flush does not result in stream end, then we
+		// must report an error here since the worker thread will
+		// not recieve more input / output space, so the result
+		// will never change and deadlock will occur
+		if (action == LZMA_SYNC_FLUSH && ret != LZMA_STREAM_END) {
+			worker_error(thr, LZMA_PROG_ERROR);
+			return THR_STOP;
+		}
+
+		// If the thread is told to finish, then there is no point
+		// in flushing any partial output
+		if (state != THR_FINISH && ret == LZMA_STREAM_END &&
+				flush_mode == SYNC_FLUSH_ENABLED) {
+			// Write out partial block
+			// Disable the sizes on the block header
+			// before writing out so that the partial sizes are
+			// not written to the output
+			thr->block_options.uncompressed_size =
+					LZMA_VLI_UNKNOWN;
+			thr->block_options.compressed_size =
+					LZMA_VLI_UNKNOWN;
+			ret = lzma_block_header_encode(&thr->block_options,
+					thr->outbuf->buf);
+
+			if (ret != LZMA_OK)
+				break;
+
+			// If data since last partial output was
+			// uncompressable, write partial block without
+			// headers
+			if ((in_pos - partial_in_pos) <
+					(*out_pos - partial_out_pos)) {
+				// Make space for block header
+				if (partial_out_pos == 0)
+					partial_out_pos =
+						thr->block_options.header_size;
+				// Overwrite contents of outbuf and
+				// adjust *out_pos to the new size.
+				// Encode from partial_in_pos in the
+				// input buffer to partial_out_pos
+				// in the output buffer
+				ret = lzma_encode_data_uncomp(
+						thr->in + partial_in_pos,
+						in_pos - partial_in_pos,
+						thr->outbuf->buf,
+						&partial_out_pos,
+						out_size);
+				// Update out position to partial output
+				// value since this should be less or equal
+				// to what it was before
+				*out_pos = partial_out_pos;
+			}
+
+			mythread_sync(thr->coder->mutex) {
+				// Signal main thread that output is ready
+				thr->outbuf->pos = *out_pos;
+				thr->flush_mode = SYNC_FLUSH_DONE;
+				mythread_cond_signal(&thr->coder->cond);
+			}
+			partial_out_pos = *out_pos;
+			partial_in_pos = in_pos;
+		}
+	} while (ret == LZMA_OK && (*out_pos < out_size || flush_mode == SYNC_FLUSH_ENABLED));
 
 	switch (ret) {
 	case LZMA_STREAM_END:
 		assert(state == THR_FINISH);
 
+		// No need to re-encode the block header if it has
+		// already been written out
+		if (partial_out_pos)
+			break;
+
 		// Encode the Block Header. By doing it after
 		// the compression, we can store the Compressed Size
 		// and Uncompressed Size fields.
@@ -302,7 +417,6 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
 	case LZMA_OK:
 		// The data was incompressible. Encode it using uncompressed
 		// LZMA2 chunks.
-		//
 		// First wait that we have gotten all the input.
 		mythread_sync(thr->mutex) {
 			while (thr->state == THR_RUN)
@@ -315,6 +429,65 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
 		if (state >= THR_STOP)
 			return state;
 
+		// If there has been a partial write already,
+		// then only the remaining part of the block
+		// will be attempted to be written to the block
+		// uncompressed (if the remaining part of the block
+		// can actually fit)
+		if (partial_out_pos && in_pos - partial_in_pos <
+				*out_pos - partial_out_pos) {
+			ret = lzma_encode_data_uncomp(thr->in,
+					partial_in_pos,
+					thr->outbuf->buf,
+					&partial_out_pos,
+					out_size);
+			if (ret != LZMA_OK) {
+				worker_error(thr, ret);
+				return THR_STOP;
+			}
+
+			*out_pos = partial_out_pos;
+
+			// Check for overflow
+			if (*out_pos > out_size)
+				worker_error(thr, LZMA_BUF_ERROR);
+
+			// Add end marker
+			thr->outbuf->buf[(*out_pos)++] = 0x00;
+
+			// Add padding
+			for (size_t i = *out_pos; i & 3; i++) {
+				if (*out_pos > out_size)
+					worker_error(thr, LZMA_BUF_ERROR);
+				thr->outbuf->buf[(*out_pos)++] = 0x00;
+			}
+
+			// Add check field
+			const size_t check_size = lzma_check_size(
+						thr->block_options.check);
+
+			if ((*out_pos + check_size) > out_size)
+				worker_error(thr, LZMA_BUF_ERROR);
+
+			if (check_size) {
+				lzma_check check_type =
+						thr->block_options.check;
+				lzma_check_state check;
+				lzma_check_init(&check, check_type);
+				lzma_check_update(&check, check_type,
+						thr->in, in_size);
+				lzma_check_finish(&check, check_type);
+
+				memcpy(thr->block_options.raw_check,
+						check.buffer.u8, check_size);
+				memcpy(thr->outbuf->buf + *out_pos,
+						check.buffer.u8, check_size);
+				*out_pos += check_size;
+			}
+
+			break;
+		}
+
 		// Do the encoding. This takes care of the Block Header too.
 		*out_pos = 0;
 		ret = lzma_block_uncomp_encode(&thr->block_options,
@@ -409,6 +582,7 @@ worker_start(void *thr_ptr)
 			// Return this thread to the stack of free threads.
 			thr->next = thr->coder->threads_free;
 			thr->coder->threads_free = thr;
+			thr->flush_mode = SYNC_FLUSH_DISABLED;
 
 			mythread_cond_signal(&thr->coder->cond);
 		}
@@ -498,6 +672,7 @@ initialize_new_thread(lzma_stream_coder *coder,
 	thr->progress_in = 0;
 	thr->progress_out = 0;
 	thr->block_encoder = LZMA_NEXT_CODER_INIT;
+	thr->flush_mode = SYNC_FLUSH_DISABLED;
 
 	if (mythread_create(&thr->thread_id, &worker_start, thr))
 		goto error_thread;
@@ -554,7 +729,8 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
 	mythread_sync(coder->thr->mutex) {
 		coder->thr->state = THR_RUN;
 		coder->thr->in_size = 0;
-		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
+		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq,
+				coder->thr);
 		mythread_cond_signal(&coder->thr->cond);
 	}
 
@@ -585,10 +761,9 @@ stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
 		//  - it has got block_size bytes of input; or
 		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
 		//    or LZMA_FULL_BARRIER was used.
-		//
-		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
 		const bool finish = thr_in_size == coder->block_size
-				|| (*in_pos == in_size && action != LZMA_RUN);
+				|| (*in_pos == in_size && action != LZMA_RUN
+				&& action != LZMA_SYNC_FLUSH);
 
 		bool block_error = false;
 
@@ -605,6 +780,15 @@ stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
 
 				if (finish)
 					coder->thr->state = THR_FINISH;
+				else
+					coder->thr->state = THR_RUN;
+
+				// Mark the current thread to perform
+				// LZMA_SYNC_FLUSH action
+				if (action == LZMA_SYNC_FLUSH)
+					lzma_outq_enable_partial_output(
+						&coder->outq,
+						&worker_enable_partial_update);
 
 				mythread_cond_signal(&coder->thr->cond);
 			}
@@ -620,8 +804,15 @@ stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
 			return ret;
 		}
 
+		// If finish is true, move on to the next thread
+		// Otherwise, if LZMA_SYNC_FLUSH is requested then
+		// we do not need to keep iterating since we have
+		// marked the last thread to receive data to give a
+		// partial update
 		if (finish)
 			coder->thr = NULL;
+		else if (action == LZMA_SYNC_FLUSH)
+			break;
 	}
 
 	return LZMA_OK;
@@ -710,6 +901,10 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
 		// These are for wait_for_work().
 		bool has_blocked = false;
 		mythread_condtime wait_abs;
+		// Flag to mark if we need to exit early because of
+		// LZMA_SYNC_FLUSH. Needed to avoid returning while
+		// still holding the coder mutex.
+		bool sync_exit = false;
 
 		while (true) {
 			mythread_sync(coder->mutex) {
@@ -725,8 +920,26 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
 						out, out_pos, out_size,
 						&unpadded_size,
 						&uncompressed_size);
+				// If a sync flush was requested and the
+				// thread has written all of its output,
+				// mark the data as copied and return to
+				// the caller. Cannot return from inside
+				// this block because we still hold the
+				// mutex.
+				if (action == LZMA_SYNC_FLUSH &&
+						*in_pos == in_size &&
+						(coder->thr &&
+						coder->thr->flush_mode ==
+						SYNC_FLUSH_DONE)){
+					coder->thr->flush_mode =
+							SYNC_FLUSH_DISABLED;
+					sync_exit = true;
+				}
 			}
 
+			if (sync_exit)
+				return LZMA_STREAM_END;
+
 			if (ret == LZMA_STREAM_END) {
 				// End of Block. Add it to the Index.
 				ret = lzma_index_append(coder->index,
@@ -759,8 +972,6 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
 			}
 
 			// See if we should wait or return.
-			//
-			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
 			if (*in_pos == in_size) {
 				// LZMA_RUN: More data is probably coming
 				// so return to let the caller fill the
@@ -783,10 +994,11 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
 					if (action == LZMA_FINISH)
 						break;
 
-					// LZMA_FULL_FLUSH: Return to tell
-					// the caller that flushing was
-					// completed.
-					if (action == LZMA_FULL_FLUSH)
+					// LZMA_FULL_FLUSH / LZMA_SYNC_FLUSH:
+					// Return to tell the caller that
+					// flushing was completed.
+					if (action == LZMA_FULL_FLUSH ||
+						action == LZMA_SYNC_FLUSH)
 						return LZMA_STREAM_END;
 				}
 			}
@@ -1102,7 +1314,7 @@ lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
 	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
 
 	strm->internal->supported_actions[LZMA_RUN] = true;
-// 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
+ 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
 	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
 	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
 	strm->internal->supported_actions[LZMA_FINISH] = true;
-- 
2.25.1

Reply via email to