Hi,
Attached is a patch that enables 'streaming' support for xz output, in short LZMA_SYNC_FLUSH is
called every X milliseconds. We find it helpful so that we can effectively do:
----
tail -f foobar.log.xz | nc w.x.y.z 1234
----
Meanwhile foobar.log.xz is effectively being generated with:
----
tail -f foobar.log | xz -c --select-timeout 500 > foobar.log.xz
----
This means the receiver then gets something that is decodeable in X milliseconds rather than
having to wait for a whole block to be generated and flushed, which might be a considerable time
if whatever is writing to foobar.log is low volume (100 bytes per second for example).
The patch is for 5.0.0 (what is currently in Debian 'oldstable/squeeze') but if the community
likes the look of the patch, I can roll a version for whatever is at the HEAD of the git tree.
Feedback welcomed.
Cheers
--
Alexander Clouter
.sigmonster says: Friction is a drag.
diff -u -r xz-utils-5.0.0.orig/src/xz/args.c xz-utils-5.0.0/src/xz/args.c
--- xz-utils-5.0.0.orig/src/xz/args.c 2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/args.c 2013-05-01 21:14:15.567999924 +0100
@@ -73,6 +73,7 @@
OPT_FILES0,
OPT_MEM_COMPRESS,
OPT_MEM_DECOMPRESS,
+ OPT_SELECT_TIMEOUT,
OPT_NO_ADJUST,
OPT_INFO_MEMORY,
OPT_ROBOT,
@@ -107,6 +108,7 @@
{ "memlimit-decompress", required_argument, NULL, OPT_MEM_DECOMPRESS },
{ "memlimit", required_argument, NULL, 'M' },
{ "memory", required_argument, NULL, 'M' }, // Old alias
+ { "select-timeout", required_argument, NULL, OPT_SELECT_TIMEOUT },
{ "no-adjust", no_argument, NULL, OPT_NO_ADJUST },
{ "threads", required_argument, NULL, 'T' },
@@ -169,6 +171,12 @@
true, true);
break;
+ // --select-timeout
+ case OPT_SELECT_TIMEOUT:
+ opt_select_timeout = str_to_uint64(
+ "select-timeout", optarg, 0, UINT32_MAX);
+ break;
+
// --suffix
case 'S':
suffix_set(optarg);
diff -u -r xz-utils-5.0.0.orig/src/xz/coder.c xz-utils-5.0.0/src/xz/coder.c
--- xz-utils-5.0.0.orig/src/xz/coder.c 2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/coder.c 2013-05-03 19:55:58.272000010 +0100
@@ -24,6 +24,7 @@
enum operation_mode opt_mode = MODE_COMPRESS;
enum format_type opt_format = FORMAT_AUTO;
bool opt_auto_adjust = true;
+int opt_select_timeout = 0;
/// Stream used to communicate with liblzma
@@ -255,6 +256,17 @@
memory_limit), 2));
}
+ if (opt_select_timeout !=0) {
+ if (opt_format == FORMAT_LZMA)
+ message_fatal(_("LZMA_SYNC_FLUSH is not available for LZMA1"));
+
+ if (opt_mode == MODE_COMPRESS)
+ message(V_DEBUG, _("LZMA_SYNC_FLUSH set for every %s milliseconds"),
+ uint64_to_str(opt_select_timeout, 0));
+ else
+ message_fatal(_("LZMA_SYNC_FLUSH is only available for compression"));
+ };
+
/*
// Limit the number of worker threads so that memory usage
// limit isn't exceeded.
@@ -472,9 +484,11 @@
strm.avail_out = IO_BUFFER_SIZE;
while (!user_abort) {
+ if (pair->select_timeout && !pair->src_eof)
+ action = LZMA_SYNC_FLUSH;
// Fill the input buffer if it is empty and we haven't reached
// end of file yet.
- if (strm.avail_in == 0 && !pair->src_eof) {
+ else if (strm.avail_in == 0 && !pair->src_eof) {
strm.next_in = in_buf.u8;
strm.avail_in = io_read(
pair, &in_buf, IO_BUFFER_SIZE);
@@ -484,13 +498,15 @@
if (pair->src_eof)
action = LZMA_FINISH;
+ else if (strm.avail_in == 0)
+ continue;
}
// Let liblzma do the actual work.
ret = lzma_code(&strm, action);
// Write out if the output buffer became full.
- if (strm.avail_out == 0) {
+ if (strm.avail_out == 0 || action == LZMA_SYNC_FLUSH) {
if (opt_mode != MODE_TEST && io_write(pair, &out_buf,
IO_BUFFER_SIZE - strm.avail_out))
break;
@@ -518,6 +534,12 @@
}
if (ret == LZMA_STREAM_END) {
+ if (action == LZMA_SYNC_FLUSH) {
+ pair->select_timeout = false;
+ action = LZMA_RUN;
+ continue;
+ }
+
// Check that there is no trailing garbage.
// This is needed for LZMA_Alone and raw
// streams.
diff -u -r xz-utils-5.0.0.orig/src/xz/coder.h xz-utils-5.0.0/src/xz/coder.h
--- xz-utils-5.0.0.orig/src/xz/coder.h 2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/coder.h 2013-05-01 16:02:04.676000099 +0100
@@ -41,6 +41,8 @@
/// they exceed the memory usage limit.
extern bool opt_auto_adjust;
+/// Autoflush after X milliseconds, whether to use select() on input.
+extern int opt_select_timeout;
/// Set the integrity check type used when compressing
extern void coder_set_check(lzma_check check);
diff -u -r xz-utils-5.0.0.orig/src/xz/file_io.c xz-utils-5.0.0/src/xz/file_io.c
--- xz-utils-5.0.0.orig/src/xz/file_io.c 2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/file_io.c 2013-05-03 19:55:56.672000010 +0100
@@ -13,6 +13,8 @@
#include "private.h"
#include <fcntl.h>
+#include <sys/time.h>
+#include <sys/select.h>
#ifdef TUKLIB_DOSLIKE
# include <io.h>
@@ -290,6 +292,15 @@
{
// There's nothing to open when reading from stdin.
if (pair->src_name == stdin_filename) {
+ if (opt_select_timeout != 0) {
+ int retval;
+ int flags = fcntl(STDIN_FILENO, F_GETFL, 0);
+ retval = fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK);
+ if (retval)
+ message_fatal(_("failed to set O_NONBLOCK on STDIN: %s"),
+ strerror(errno));
+ }
+
pair->src_fd = STDIN_FILENO;
#ifdef TUKLIB_DOSLIKE
setmode(STDIN_FILENO, O_BINARY);
@@ -526,6 +537,7 @@
.src_eof = false,
.dest_try_sparse = false,
.dest_pending_sparse = 0,
+ .select_timeout = false,
};
// Block the signals, for which we have a custom signal handler, so
@@ -796,14 +808,69 @@
extern size_t
io_read(file_pair *pair, io_buf *buf_union, size_t size)
{
+ static struct timeval stv;
+ struct timeval ctv;
+
// We use small buffers here.
assert(size < SSIZE_MAX);
uint8_t *buf = buf_union->u8;
size_t left = size;
+ if (opt_select_timeout != 0 && !timerisset(&stv)) {
+ if (gettimeofday(&stv, NULL) != 0)
+ message_fatal(_("gettimeofday() failed: %s"),
+ strerror(errno));
+ }
+
while (left > 0) {
- const ssize_t amount = read(pair->src_fd, buf, left);
+ ssize_t amount;
+
+ if (opt_select_timeout != 0) {
+ fd_set fds;
+ int retval;
+ struct timeval tv = {
+ .tv_sec = opt_select_timeout/1000,
+ .tv_usec = (opt_select_timeout % 1000)*1000,
+ };
+
+ FD_ZERO(&fds);
+ FD_SET(pair->src_fd, &fds);
+
+ if (gettimeofday(&ctv, NULL) != 0)
+ message_fatal(_("gettimeofday() failed: %s"),
+ strerror(errno));
+
+ timersub(&ctv, &stv, &ctv);
+
+ if (!timercmp(&tv, &ctv, >)) {
+ pair->select_timeout = true;
+ timerclear(&stv);
+ break;
+ }
+
+ timersub(&tv, &ctv, &tv);
+
+ retval = select(pair->src_fd+1,
+ &fds, NULL, &fds, &tv);
+
+ if (retval == -1) {
+ if (errno == EINTR)
+ continue;
+
+ message_error(_("select() error: %s"),
+ strerror(errno));
+ pair->src_eof = true;
+
+ return SIZE_MAX;
+ } else if (retval == 0) {
+ pair->select_timeout = true;
+ timerclear(&stv);
+ break;
+ }
+ }
+
+ amount = read(pair->src_fd, buf, left);
if (amount == 0) {
pair->src_eof = true;
diff -u -r xz-utils-5.0.0.orig/src/xz/file_io.h xz-utils-5.0.0/src/xz/file_io.h
--- xz-utils-5.0.0.orig/src/xz/file_io.h 2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/file_io.h 2013-05-02 13:49:01.132000005 +0100
@@ -61,6 +61,8 @@
/// Stat of the destination file.
struct stat dest_st;
+ // select timeout flag on when to flush
+ bool select_timeout;
} file_pair;
diff -u -r xz-utils-5.0.0.orig/src/xz/message.c xz-utils-5.0.0/src/xz/message.c
--- xz-utils-5.0.0.orig/src/xz/message.c 2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/message.c 2013-05-01 23:40:32.156000006 +0100
@@ -1149,6 +1149,13 @@
if (long_help) {
puts(_(
+" --select-timeout=TIMEOUT\n"
+" call LZMA_SYNC_FLUSH every TIMEOUT milliseconds when\n"
+" less than IO_BUFFER_SIZE bytes of input are available"));
+ }
+
+ if (long_help) {
+ puts(_(
"\n Custom filter chain for compression (alternative for using presets):"));
#if defined(HAVE_ENCODER_LZMA1) || defined(HAVE_DECODER_LZMA1) \
diff -u -r xz-utils-5.0.0.orig/src/xz/xz.1 xz-utils-5.0.0/src/xz/xz.1
--- xz-utils-5.0.0.orig/src/xz/xz.1 2010-10-23 15:47:33.000000000 +0100
+++ xz-utils-5.0.0/src/xz/xz.1 2013-05-01 23:51:11.704000006 +0100
@@ -873,6 +873,14 @@
Automatic adjusting is always disabled when creating raw streams
.RB ( \-\-format=raw ).
.TP
+.BI \-\-select\-timeout= timeout
+Call LZMA_SYNC_FLUSH to the coder every
+.I timeout
+milliseconds when the bytes read on the input are less than
+IO_BUFFER_SIZE.
+This is helpful when you wish to stream content to the output in
+a latency sensitive environment.
+.TP
\fB\-T\fR \fIthreads\fR, \fB\-\-threads=\fIthreads
Specify the number of worker threads to use.
The actual number of threads can be less than