Author: rhuijben
Date: Wed Nov 11 16:46:44 2015
New Revision: 1713889
URL: http://svn.apache.org/viewvc?rev=1713889&view=rev
Log:
Add a few states to the deflate bucket state machine to allow using it
for compression too.
Zlib already abstracts everything to a stream and we already had all the
necessary plumbing in place.
I think the original idea was to expose this via the same function as
deflating, but as we call this a deflate bucket while we only supported
inflating it is kind of hard to invent good names for the 'format' argument
to support compression.
* buckets/deflate_buckets.c
(deflate_state_t): Add some initial and a finish state. Document memLevel
usage.
(serf_bucket_deflate_create): Tweak memlevel.
(serf_bucket_deflate_compress_create): New function.
(serf_deflate_refill): Tell ZLib that we have the final part. Only push back
when necessary. Produce verify header when done.
(serf_deflate_wait_for_data): Implement compress states.
* serf_bucket_types.h
(serf_bucket_deflate_compress_create): New function.
* test/test_buckets.c
(test_linebuf_fetch_crlf): Remove unused var.
(test_deflate_compress_buckets): Add initial test.
Modified:
serf/trunk/buckets/deflate_buckets.c
serf/trunk/serf_bucket_types.h
serf/trunk/test/test_buckets.c
Modified: serf/trunk/buckets/deflate_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/buckets/deflate_buckets.c?rev=1713889&r1=1713888&r2=1713889&view=diff
==============================================================================
--- serf/trunk/buckets/deflate_buckets.c (original)
+++ serf/trunk/buckets/deflate_buckets.c Wed Nov 11 16:46:44 2015
@@ -48,7 +48,7 @@ typedef struct deflate_context_t {
int format; /* Are we 'deflate' or 'gzip'? */
- enum {
+ enum deflate_state_t {
STATE_READING_HEADER, /* reading the gzip header */
STATE_HEADER, /* read the gzip header */
STATE_INIT, /* init'ing zlib functions */
@@ -57,6 +57,11 @@ typedef struct deflate_context_t {
STATE_VERIFY, /* verifying the final gzip CRC */
STATE_FINISH, /* clean up after reading body */
STATE_DONE, /* body is done; we'll return EOF here */
+
+ /* When handling things the other way around */
+ STATE_WRITING_HEADER, /* produces a gzip header */
+ STATE_COMPRESS_INIT, /* initializes zlib for compression */
+ STATE_COMPRESS_FINISH, /* clean up after producing body */
} state;
z_stream zstream;
@@ -64,7 +69,8 @@ typedef struct deflate_context_t {
unsigned char buffer[DEFLATE_BUFFER_SIZE];
unsigned long crc;
int windowSize;
- int memLevel;
+ int memLevel; /* -1 when decompressing.
+ Otherwise the memlevel to use*/
int bufferSize;
/* How much of the chunk, or the terminator, do we have left to read? */
@@ -121,7 +127,48 @@ serf_bucket_t *serf_bucket_deflate_creat
ctx->stream_left = ctx->stream_size = DEFLATE_MAGIC_SIZE;
ctx->windowSize = DEFLATE_WINDOW_SIZE;
- ctx->memLevel = DEFLATE_MEMLEVEL;
+ ctx->memLevel = -1;
+ ctx->bufferSize = DEFLATE_BUFFER_SIZE;
+
+ return serf_bucket_create(&serf_bucket_type_deflate, allocator, ctx);
+}
+
+serf_bucket_t *serf_bucket_deflate_compress_create(
+ serf_bucket_t *stream,
+ int memlevel,
+ int format,
+ serf_bucket_alloc_t *allocator)
+{
+ deflate_context_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->stream = stream;
+ ctx->stream_status = APR_SUCCESS;
+ ctx->inflate_stream = serf_bucket_aggregate_create(allocator);
+ ctx->format = format;
+ ctx->crc = 0;
+ ctx->config = NULL;
+ /* zstream must be NULL'd out. */
+ memset(&ctx->zstream, 0, sizeof(ctx->zstream));
+
+ switch (ctx->format) {
+ case SERF_DEFLATE_GZIP:
+ ctx->state = STATE_WRITING_HEADER;
+ break;
+ case SERF_DEFLATE_DEFLATE:
+ /* deflate doesn't have a header. */
+ ctx->state = STATE_COMPRESS_INIT;
+ break;
+ default:
+ /* Not reachable */
+ return NULL;
+ }
+
+ /* Initial size of gzip header. */
+ ctx->stream_left = ctx->stream_size = DEFLATE_MAGIC_SIZE;
+
+ ctx->windowSize = DEFLATE_WINDOW_SIZE;
+ ctx->memLevel = (memlevel > 0) ? memlevel : DEFLATE_MEMLEVEL;
ctx->bufferSize = DEFLATE_BUFFER_SIZE;
return serf_bucket_create(&serf_bucket_type_deflate, allocator, ctx);
@@ -151,6 +198,7 @@ static apr_status_t serf_deflate_refill(
deflate_context_t *ctx = bucket->data;
apr_status_t status;
int zRC;
+ int flush_v = Z_NO_FLUSH;
/* We have nothing buffered. Fetch more. */
@@ -181,6 +229,9 @@ static apr_status_t serf_deflate_refill(
return status;
}
+ if (APR_STATUS_IS_EOF(ctx->stream_status))
+ flush_v = Z_FINISH;
+
/* Make valgrind happy and explictly initialize next_in to specific
* value for empty buffer. */
if (private_len) {
@@ -194,7 +245,10 @@ static apr_status_t serf_deflate_refill(
while (1) {
- zRC = inflate(&ctx->zstream, Z_NO_FLUSH);
+ if (ctx->memLevel < 0)
+ zRC = inflate(&ctx->zstream, flush_v);
+ else
+ zRC = deflate(&ctx->zstream, flush_v);
/* We're full or zlib requires more space. Either case, clear
out our buffer, reset, and return. */
@@ -234,30 +288,60 @@ static apr_status_t serf_deflate_refill(
ctx->zstream.avail_out = ctx->bufferSize;
- /* Push back the remaining data to be read. */
- tmp = serf_bucket_aggregate_create(bucket->allocator);
- serf_bucket_set_config(tmp, ctx->config);
- serf_bucket_aggregate_prepend(tmp, ctx->stream);
- ctx->stream = tmp;
-
- /* We now need to take the remaining avail_in and
- * throw it in ctx->stream so our next read picks it up.
- */
- tmp = SERF_BUCKET_SIMPLE_STRING_LEN(
+ if (ctx->zstream.avail_in) {
+ /* Push back the remaining data to be read. */
+ tmp = serf_bucket_aggregate_create(bucket->allocator);
+ serf_bucket_set_config(tmp, ctx->config);
+ serf_bucket_aggregate_prepend(tmp, ctx->stream);
+ ctx->stream = tmp;
+
+ /* We now need to take the remaining avail_in and
+ * throw it in ctx->stream so our next read picks it up.
+ */
+
+ tmp = SERF_BUCKET_SIMPLE_STRING_LEN(
(const char*)ctx->zstream.next_in,
ctx->zstream.avail_in,
bucket->allocator);
- serf_bucket_aggregate_prepend(ctx->stream, tmp);
+ serf_bucket_aggregate_prepend(ctx->stream, tmp);
+ }
switch (ctx->format) {
case SERF_DEFLATE_GZIP:
- ctx->stream_left = ctx->stream_size =
- DEFLATE_VERIFY_SIZE;
- ctx->state++;
+ if (ctx->memLevel >= 0) {
+ char *verify_header = serf_bucket_mem_alloc(
+ bucket->allocator,
+ DEFLATE_VERIFY_SIZE);
+
+ verify_header[0] = (ctx->crc >> 24) & 0xFF;
+ verify_header[1] = (ctx->crc >> 16) & 0xFF;
+ verify_header[2] = (ctx->crc >> 8) & 0xFF;
+ verify_header[3] = ctx->crc & 0xFF;
+
+ verify_header[4] = (ctx->zstream.total_out >> 24) & 0xFF;
+ verify_header[5] = (ctx->zstream.total_out >> 16) & 0xFF;
+ verify_header[6] = (ctx->zstream.total_out >> 8) & 0xFF;
+ verify_header[7] = ctx->zstream.total_out & 0xFF;
+
+ serf_bucket_aggregate_append(
+ ctx->inflate_stream,
+ serf_bucket_simple_own_create(verify_header,
+
DEFLATE_VERIFY_SIZE,
+
bucket->allocator));
+ ctx->state = STATE_COMPRESS_FINISH;
+ }
+ else {
+ ctx->stream_left = ctx->stream_size =
+ DEFLATE_VERIFY_SIZE;
+ ctx->state++;
+ }
break;
case SERF_DEFLATE_DEFLATE:
/* Deflate does not have a verify footer. */
- ctx->state = STATE_FINISH;
+ if (ctx->memLevel >= 0)
+ ctx->state = STATE_COMPRESS_FINISH;
+ else
+ ctx->state = STATE_FINISH;
break;
default:
/* Not reachable */
@@ -390,6 +474,42 @@ static apr_status_t serf_deflate_wait_fo
case STATE_DONE:
/* We're done inflating. Use our finished buffer. */
return ctx->inflate_stream ? APR_SUCCESS : APR_EOF;
+
+
+ case STATE_WRITING_HEADER:
+ {
+ char *header = serf_bucket_mem_calloc(bucket->allocator,
+ DEFLATE_MAGIC_SIZE);
+ memcpy(header, deflate_magic, sizeof(deflate_magic));
+ header[2] = Z_DEFLATED;
+ /* No mtime. DOS/Default OS */
+
+ serf_bucket_aggregate_append(
+ ctx->inflate_stream,
+ serf_bucket_simple_own_create(header, DEFLATE_MAGIC_SIZE,
+ bucket->allocator));
+ ctx->state++;
+ break;
+ }
+ case STATE_COMPRESS_INIT:
+ zRC = deflateInit2(&ctx->zstream, Z_DEFAULT_STRATEGY, Z_DEFLATED,
+ ctx->windowSize, ctx->memLevel,
Z_DEFAULT_STRATEGY);
+ if (zRC != Z_OK) {
+ serf__log(LOGLVL_ERROR, LOGCOMP_COMPR, __FILE__, ctx->config,
+ "deflateInit2 error %d - %s\n",
+ zRC, ctx->zstream.msg);
+ return SERF_ERROR_DECOMPRESSION_FAILED;
+ }
+ ctx->zstream.next_out = ctx->buffer;
+ ctx->zstream.avail_out = ctx->bufferSize;
+ ctx->state = STATE_INFLATE;
+ break;
+ case STATE_COMPRESS_FINISH:
+ deflateEnd(&ctx->zstream);
+ serf_bucket_aggregate_prepend(ctx->stream, ctx->inflate_stream);
+ ctx->inflate_stream = NULL;
+ ctx->state++;
+ break;
default:
/* Not reachable */
return APR_EGENERAL;
Modified: serf/trunk/serf_bucket_types.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_bucket_types.h?rev=1713889&r1=1713888&r2=1713889&view=diff
==============================================================================
--- serf/trunk/serf_bucket_types.h (original)
+++ serf/trunk/serf_bucket_types.h Wed Nov 11 16:46:44 2015
@@ -481,6 +481,12 @@ serf_bucket_t *serf_bucket_deflate_creat
serf_bucket_alloc_t *allocator,
int format);
+serf_bucket_t *serf_bucket_deflate_compress_create(
+ serf_bucket_t *stream,
+ int memlevel,
+ int format,
+ serf_bucket_alloc_t *allocator);
+
/* ==================================================================== */
Modified: serf/trunk/test/test_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1713889&r1=1713888&r2=1713889&view=diff
==============================================================================
--- serf/trunk/test/test_buckets.c (original)
+++ serf/trunk/test/test_buckets.c Wed Nov 11 16:46:44 2015
@@ -2004,7 +2004,6 @@ static void test_linebuf_fetch_crlf(CuTe
serf_bucket_t *bkt;
serf_linebuf_t linebuf;
serf_bucket_type_t *unfriendly;
- apr_status_t status;
serf_bucket_alloc_t *alloc = test__create_bucket_allocator(tc, tb->pool);
@@ -2264,6 +2263,26 @@ static void test_limit_buckets(CuTest *t
}
}
+static void test_deflate_compress_buckets(CuTest *tc)
+{
+ test_baton_t *tb = tc->testBaton;
+ serf_bucket_alloc_t *alloc = tb->bkt_alloc;
+ serf_bucket_t *bkt;
+ int i;
+ const char *body = "12345678901234567890" CRLF
+ "12345678901234567890" CRLF
+ "12345678901234567890" CRLF;
+
+ for (i = SERF_DEFLATE_GZIP; i <= SERF_DEFLATE_DEFLATE; i++) {
+ bkt = SERF_BUCKET_SIMPLE_STRING(body, alloc);
+ bkt = serf_bucket_deflate_compress_create(bkt, 0, i, alloc);
+ bkt = serf_bucket_deflate_create(bkt, alloc, i);
+
+ read_and_check_bucket(tc, bkt, body);
+ serf_bucket_destroy(bkt);
+ }
+}
+
/* Basic test for unframe buckets. */
static void test_http2_unframe_buckets(CuTest *tc)
{
@@ -2771,6 +2790,7 @@ CuSuite *test_buckets(void)
SUITE_ADD_TEST(suite, test_deflate_buckets);
SUITE_ADD_TEST(suite, test_prefix_buckets);
SUITE_ADD_TEST(suite, test_limit_buckets);
+ SUITE_ADD_TEST(suite, test_deflate_compress_buckets);
SUITE_ADD_TEST(suite, test_http2_unframe_buckets);
SUITE_ADD_TEST(suite, test_http2_unpad_buckets);
SUITE_ADD_TEST(suite, test_hpack_huffman_decode);