Hi Robert,

On Sat, Jun 24, 2023 at 09:48:31PM +0100, Robert Newson wrote:
> Hi,
> 
> That sounds great, much appreciated. I'll be available all week to test any
> patches you might propose.

I gave it a try. There was already a flush call in the data block
processing (I don't know why, to be honest, I'd rather condition it to
the low latency, but let's not mix things for now). So I implemented a
flush() operation for slz and added a call to it in haproxy.

I'd be interested if you could test the two attached patches. They're
for 2.9-dev but will likely apply to 2.8 and probably even a number of
older releases since this part doesn't change often. My tests were
fairly limited, I just verified that compressing a large file (200 MB)
directly with slz while injecting flushes after each read() continues
to produce the valid data, and that the regtests are still OK (a few
of them do use the compression).

Thanks,
Willy
>From e8fb0282200a013468d5a2b86cf6ce1ac115a6cf Mon Sep 17 00:00:00 2001
From: Willy Tarreau <w...@1wt.eu>
Date: Mon, 26 Jun 2023 19:24:55 +0200
Subject: IMPORT: slz: implement a synchronous flush() operation

In some cases it may be desirable for latency reasons to forcefully
flush the queue even if it results in suboptimal compression. In our
case the queue might contain up to almost 4 bytes, which need an EOB
and a switch to literal mode, followed by 4 bytes to encode an empty
message. This means that each call can add 5 extra bytes in the ouput
stream. And the flush may also result in the header being produced for
the first time, which can amount to 2 or 10 bytes (zlib or gzip). In
the worst case, a total of 19 bytes may be emitted at once upon a flush
with 31 pending bits and a gzip header.

This is libslz upstream commit cf8c4668e4b4216e930b56338847d8d46a6bfda9.
---
 include/import/slz.h | 27 +++++++++++++++
 src/slz.c            | 78 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 105 insertions(+)

diff --git a/include/import/slz.h b/include/import/slz.h
index d01599ca4..901a79027 100644
--- a/include/import/slz.h
+++ b/include/import/slz.h
@@ -86,6 +86,7 @@ struct slz_stream {
 /* Functions specific to rfc1951 (deflate) */
 long slz_rfc1951_encode(struct slz_stream *strm, unsigned char *out, const 
unsigned char *in, long ilen, int more);
 int slz_rfc1951_init(struct slz_stream *strm, int level);
+int slz_rfc1951_flush(struct slz_stream *strm, unsigned char *buf);
 int slz_rfc1951_finish(struct slz_stream *strm, unsigned char *buf);
 
 /* Functions specific to rfc1952 (gzip) */
@@ -94,6 +95,7 @@ uint32_t slz_crc32_by4(uint32_t crc, const unsigned char 
*buf, int len);
 long slz_rfc1952_encode(struct slz_stream *strm, unsigned char *out, const 
unsigned char *in, long ilen, int more);
 int slz_rfc1952_send_header(struct slz_stream *strm, unsigned char *buf);
 int slz_rfc1952_init(struct slz_stream *strm, int level);
+int slz_rfc1952_flush(struct slz_stream *strm, unsigned char *buf);
 int slz_rfc1952_finish(struct slz_stream *strm, unsigned char *buf);
 
 /* Functions specific to rfc1950 (zlib) */
@@ -102,6 +104,7 @@ uint32_t slz_adler32_block(uint32_t crc, const unsigned 
char *buf, long len);
 long slz_rfc1950_encode(struct slz_stream *strm, unsigned char *out, const 
unsigned char *in, long ilen, int more);
 int slz_rfc1950_send_header(struct slz_stream *strm, unsigned char *buf);
 int slz_rfc1950_init(struct slz_stream *strm, int level);
+int slz_rfc1950_flush(struct slz_stream *strm, unsigned char *buf);
 int slz_rfc1950_finish(struct slz_stream *strm, unsigned char *buf);
 
 /* generic functions */
@@ -170,4 +173,28 @@ static inline int slz_finish(struct slz_stream *strm, void 
*buf)
        return ret;
 }
 
+/* Flushes any pending data for stream <strm> into buffer <buf>, then emits an
+ * empty literal block to byte-align the output, allowing to completely flush
+ * the queue. Note that if the initial header was never sent, it will be sent
+ * first as well (0, 2 or 10 extra bytes). This requires that the output buffer
+ * still has this plus the size of the queue available (up to 4 bytes), plus
+ * one byte for (BFINAL,BTYPE), plus 4 bytes for LEN+NLEN, or a total of 19
+ * bytes in the worst case. The number of bytes emitted is returned. It is
+ * guaranteed that the queue is empty on return. This may cause some overhead
+ * by adding needless 5-byte blocks if called to often.
+ */
+static inline int slz_flush(struct slz_stream *strm, void *buf)
+{
+       int ret;
+
+       if (strm->format == SLZ_FMT_GZIP)
+               ret = slz_rfc1952_flush(strm, (unsigned char *) buf);
+       else if (strm->format == SLZ_FMT_ZLIB)
+               ret = slz_rfc1950_flush(strm, (unsigned char *) buf);
+       else /* deflate for other ones */
+               ret = slz_rfc1951_flush(strm, (unsigned char *) buf);
+
+       return ret;
+}
+
 #endif
diff --git a/src/slz.c b/src/slz.c
index 1d6032ea2..1560bac34 100644
--- a/src/slz.c
+++ b/src/slz.c
@@ -720,6 +720,42 @@ int slz_rfc1951_init(struct slz_stream *strm, int level)
        return 0;
 }
 
+/* Flushes any pending data for stream <strm> into buffer <buf>, then emits an
+ * empty literal block to byte-align the output, allowing to completely flush
+ * the queue. This requires that the output buffer still has the size of the
+ * queue available (up to 4 bytes), plus one byte for (BFINAL,BTYPE), plus 4
+ * bytes for LEN+NLEN, or a total of 9 bytes in the worst case. The number of
+ * bytes emitted is returned. It is guaranteed that the queue is empty on
+ * return. This may cause some overhead by adding needless 5-byte blocks if
+ * called to often.
+ */
+int slz_rfc1951_flush(struct slz_stream *strm, unsigned char *buf)
+{
+       strm->outbuf = buf;
+
+       /* The queue is always empty on INIT, DONE, and END */
+       if (!strm->qbits)
+               return 0;
+
+       /* we may need to terminate a huffman output. Lit is always in EOB 
state */
+       if (strm->state != SLZ_ST_EOB) {
+               strm->state = (strm->state == SLZ_ST_LAST) ? SLZ_ST_DONE : 
SLZ_ST_EOB;
+               send_eob(strm);
+       }
+
+       /* send BFINAL according to state, and BTYPE=00 (lit) */
+       enqueue8(strm, (strm->state == SLZ_ST_DONE) ? 1 : 0, 3);
+       flush_bits(strm);             // emit pending bits
+       copy_32b(strm, 0xFFFF0000U);  // len=0, nlen=~0
+
+       /* Now the queue is empty, EOB was sent, BFINAL might have been sent if
+        * we completed the last block, and a zero-byte block was sent to byte-
+        * align the output. The last state reflects all this. Let's just
+        * return the number of bytes added to the output buffer.
+        */
+       return strm->outbuf - buf;
+}
+
 /* Flushes any pending for stream <strm> into buffer <buf>, then sends BTYPE=1
  * and BFINAL=1 if needed. The stream ends in SLZ_ST_DONE. It returns the 
number
  * of bytes emitted. The trailer consists in flushing the possibly pending bits
@@ -1053,6 +1089,27 @@ int slz_rfc1952_init(struct slz_stream *strm, int level)
        return 0;
 }
 
+/* Flushes any pending data for stream <strm> into buffer <buf>, then emits an
+ * empty literal block to byte-align the output, allowing to completely flush
+ * the queue. Note that if the initial header was never sent, it will be sent
+ * first as well (10 extra bytes). This requires that the output buffer still
+ * has this plus the size of the queue available (up to 4 bytes), plus one byte
+ * for (BFINAL,BTYPE), plus 4 bytes for LEN+NLEN, or a total of 19 bytes in the
+ * worst case. The number of bytes emitted is returned. It is guaranteed that
+ * the queue is empty on return. This may cause some overhead by adding
+ * needless 5-byte blocks if called to often.
+ */
+int slz_rfc1952_flush(struct slz_stream *strm, unsigned char *buf)
+{
+       int sent = 0;
+
+       if (__builtin_expect(strm->state == SLZ_ST_INIT, 0))
+               sent = slz_rfc1952_send_header(strm, buf);
+
+       sent += slz_rfc1951_flush(strm, buf + sent);
+       return sent;
+}
+
 /* Flushes pending bits and sends the gzip trailer for stream <strm> into
  * buffer <buf>. When it's done, the stream state is updated to SLZ_ST_END. It
  * returns the number of bytes emitted. The trailer consists in flushing the
@@ -1308,6 +1365,27 @@ int slz_rfc1950_init(struct slz_stream *strm, int level)
        return 0;
 }
 
+/* Flushes any pending data for stream <strm> into buffer <buf>, then emits an
+ * empty literal block to byte-align the output, allowing to completely flush
+ * the queue. Note that if the initial header was never sent, it will be sent
+ * first as well (2 extra bytes). This requires that the output buffer still
+ * has this plus the size of the queue available (up to 4 bytes), plus one byte
+ * for (BFINAL,BTYPE), plus 4 bytes for LEN+NLEN, or a total of 11 bytes in the
+ * worst case. The number of bytes emitted is returned. It is guaranteed that
+ * the queue is empty on return. This may cause some overhead by adding
+ * needless 5-byte blocks if called to often.
+ */
+int slz_rfc1950_flush(struct slz_stream *strm, unsigned char *buf)
+{
+       int sent = 0;
+
+       if (__builtin_expect(strm->state == SLZ_ST_INIT, 0))
+               sent = slz_rfc1950_send_header(strm, buf);
+
+       sent += slz_rfc1951_flush(strm, buf + sent);
+       return sent;
+}
+
 /* Flushes pending bits and sends the gzip trailer for stream <strm> into
  * buffer <buf>. When it's done, the stream state is updated to SLZ_ST_END. It
  * returns the number of bytes emitted. The trailer consists in flushing the
-- 
2.35.3

>From 9ce7e07f526c17dbd7cf05d36f9d9b4ed32d0577 Mon Sep 17 00:00:00 2001
From: Willy Tarreau <w...@1wt.eu>
Date: Mon, 26 Jun 2023 19:34:39 +0200
Subject: WIP: compression/slz: support just a pure flush

---
 src/compression.c | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/compression.c b/src/compression.c
index 7a8a21912..7b75461d2 100644
--- a/src/compression.c
+++ b/src/compression.c
@@ -350,6 +350,8 @@ static int rfc195x_flush_or_finish(struct comp_ctx 
*comp_ctx, struct buffer *out
 
        if (finish)
                b_add(out, slz_finish(strm, b_tail(out)));
+       else
+               b_add(out, slz_flush(strm, b_tail(out)));
 
        out_len = b_data(out) - out_len;
 
-- 
2.35.3

Reply via email to